Merge remote-tracking branch 'origin/master' into cleanup-connection-pool-surroundings

This commit is contained in:
Igor Nikonov 2024-02-02 12:15:09 +00:00
commit eb89db0343
58 changed files with 678 additions and 295 deletions

2
.gitmodules vendored
View File

@ -99,7 +99,7 @@
url = https://github.com/awslabs/aws-c-event-stream url = https://github.com/awslabs/aws-c-event-stream
[submodule "aws-c-common"] [submodule "aws-c-common"]
path = contrib/aws-c-common path = contrib/aws-c-common
url = https://github.com/ClickHouse/aws-c-common url = https://github.com/awslabs/aws-c-common.git
[submodule "aws-checksums"] [submodule "aws-checksums"]
path = contrib/aws-checksums path = contrib/aws-checksums
url = https://github.com/awslabs/aws-checksums url = https://github.com/awslabs/aws-checksums

View File

@ -166,6 +166,12 @@ set (SRCS
) )
add_library (_poco_foundation ${SRCS}) add_library (_poco_foundation ${SRCS})
target_link_libraries (_poco_foundation
PUBLIC
boost::headers_only
boost::system
)
add_library (Poco::Foundation ALIAS _poco_foundation) add_library (Poco::Foundation ALIAS _poco_foundation)
# TODO: remove these warning exclusions # TODO: remove these warning exclusions

View File

@ -22,6 +22,9 @@
#include <cstddef> #include <cstddef>
#include <map> #include <map>
#include <vector> #include <vector>
#include <boost/smart_ptr/intrusive_ptr.hpp>
#include "Poco/Channel.h" #include "Poco/Channel.h"
#include "Poco/Format.h" #include "Poco/Format.h"
#include "Poco/Foundation.h" #include "Poco/Foundation.h"
@ -34,7 +37,7 @@ namespace Poco
class Exception; class Exception;
class Logger; class Logger;
using LoggerPtr = std::shared_ptr<Logger>; using LoggerPtr = boost::intrusive_ptr<Logger>;
class Foundation_API Logger : public Channel class Foundation_API Logger : public Channel
/// Logger is a special Channel that acts as the main /// Logger is a special Channel that acts as the main
@ -871,21 +874,11 @@ public:
/// If the Logger does not yet exist, it is created, based /// If the Logger does not yet exist, it is created, based
/// on its parent logger. /// on its parent logger.
static LoggerPtr getShared(const std::string & name); static LoggerPtr getShared(const std::string & name, bool should_be_owned_by_shared_ptr_if_created = true);
/// Returns a shared pointer to the Logger with the given name. /// Returns a shared pointer to the Logger with the given name.
/// If the Logger does not yet exist, it is created, based /// If the Logger does not yet exist, it is created, based
/// on its parent logger. /// on its parent logger.
static Logger & unsafeGet(const std::string & name);
/// Returns a reference to the Logger with the given name.
/// If the Logger does not yet exist, it is created, based
/// on its parent logger.
///
/// WARNING: This method is not thread safe. You should
/// probably use get() instead.
/// The only time this method should be used is during
/// program initialization, when only one thread is running.
static Logger & create(const std::string & name, Channel * pChannel, int level = Message::PRIO_INFORMATION); static Logger & create(const std::string & name, Channel * pChannel, int level = Message::PRIO_INFORMATION);
/// Creates and returns a reference to a Logger with the /// Creates and returns a reference to a Logger with the
/// given name. The Logger's Channel and log level as set as /// given name. The Logger's Channel and log level as set as
@ -932,6 +925,16 @@ public:
static const std::string ROOT; /// The name of the root logger (""). static const std::string ROOT; /// The name of the root logger ("").
public:
struct LoggerEntry
{
Poco::Logger * logger;
bool owned_by_shared_ptr = false;
};
using LoggerMap = std::unordered_map<std::string, LoggerEntry>;
using LoggerMapIterator = LoggerMap::iterator;
protected: protected:
Logger(const std::string & name, Channel * pChannel, int level); Logger(const std::string & name, Channel * pChannel, int level);
~Logger(); ~Logger();
@ -940,12 +943,19 @@ protected:
void log(const std::string & text, Message::Priority prio, const char * file, int line); void log(const std::string & text, Message::Priority prio, const char * file, int line);
static std::string format(const std::string & fmt, int argc, std::string argv[]); static std::string format(const std::string & fmt, int argc, std::string argv[]);
static Logger & unsafeCreate(const std::string & name, Channel * pChannel, int level = Message::PRIO_INFORMATION);
static Logger & parent(const std::string & name);
static void add(Logger * pLogger);
static Logger * find(const std::string & name);
private: private:
static std::pair<Logger::LoggerMapIterator, bool> unsafeGet(const std::string & name, bool get_shared);
static Logger * unsafeGetRawPtr(const std::string & name);
static std::pair<LoggerMapIterator, bool> unsafeCreate(const std::string & name, Channel * pChannel, int level = Message::PRIO_INFORMATION);
static Logger & parent(const std::string & name);
static std::pair<LoggerMapIterator, bool> add(Logger * pLogger);
static std::optional<LoggerMapIterator> find(const std::string & name);
static Logger * findRawPtr(const std::string & name);
friend void intrusive_ptr_add_ref(Logger * ptr);
friend void intrusive_ptr_release(Logger * ptr);
Logger(); Logger();
Logger(const Logger &); Logger(const Logger &);
Logger & operator=(const Logger &); Logger & operator=(const Logger &);

View File

@ -53,11 +53,10 @@ protected:
virtual ~RefCountedObject(); virtual ~RefCountedObject();
/// Destroys the RefCountedObject. /// Destroys the RefCountedObject.
mutable std::atomic<size_t> _counter;
private: private:
RefCountedObject(const RefCountedObject &); RefCountedObject(const RefCountedObject &);
RefCountedObject & operator=(const RefCountedObject &); RefCountedObject & operator=(const RefCountedObject &);
mutable std::atomic<size_t> _counter;
}; };

View File

@ -38,14 +38,7 @@ std::mutex & getLoggerMutex()
return *logger_mutex; return *logger_mutex;
} }
struct LoggerEntry Poco::Logger::LoggerMap * _pLoggerMap = nullptr;
{
Poco::Logger * logger;
bool owned_by_shared_ptr = false;
};
using LoggerMap = std::unordered_map<std::string, LoggerEntry>;
LoggerMap * _pLoggerMap = nullptr;
} }
@ -309,38 +302,9 @@ void Logger::formatDump(std::string& message, const void* buffer, std::size_t le
namespace namespace
{ {
struct LoggerDeleter
{
void operator()(Poco::Logger * logger)
{
std::lock_guard<std::mutex> lock(getLoggerMutex());
/// If logger infrastructure is destroyed just decrement logger reference count
if (!_pLoggerMap)
{
logger->release();
return;
}
auto it = _pLoggerMap->find(logger->name());
assert(it != _pLoggerMap->end());
/** If reference count is 1, this means this shared pointer owns logger
* and need destroy it.
*/
size_t reference_count_before_release = logger->release();
if (reference_count_before_release == 1)
{
assert(it->second.owned_by_shared_ptr);
_pLoggerMap->erase(it);
}
}
};
inline LoggerPtr makeLoggerPtr(Logger & logger) inline LoggerPtr makeLoggerPtr(Logger & logger)
{ {
return std::shared_ptr<Logger>(&logger, LoggerDeleter()); return LoggerPtr(&logger, false /*add_ref*/);
} }
} }
@ -350,64 +314,87 @@ Logger& Logger::get(const std::string& name)
{ {
std::lock_guard<std::mutex> lock(getLoggerMutex()); std::lock_guard<std::mutex> lock(getLoggerMutex());
Logger & logger = unsafeGet(name); auto [it, inserted] = unsafeGet(name, false /*get_shared*/);
return *it->second.logger;
/** If there are already shared pointer created for this logger
* we need to increment Logger reference count and now logger
* is owned by logger infrastructure.
*/
auto it = _pLoggerMap->find(name);
if (it->second.owned_by_shared_ptr)
{
it->second.logger->duplicate();
it->second.owned_by_shared_ptr = false;
}
return logger;
} }
LoggerPtr Logger::getShared(const std::string & name) LoggerPtr Logger::getShared(const std::string & name, bool should_be_owned_by_shared_ptr_if_created)
{ {
std::lock_guard<std::mutex> lock(getLoggerMutex()); std::lock_guard<std::mutex> lock(getLoggerMutex());
bool logger_exists = _pLoggerMap && _pLoggerMap->contains(name); auto [it, inserted] = unsafeGet(name, true /*get_shared*/);
Logger & logger = unsafeGet(name); /** If during `unsafeGet` logger was created, then this shared pointer owns it.
* If logger was already created, then this shared pointer does not own it.
/** If logger already exists, then this shared pointer does not own it.
* If logger does not exists, logger infrastructure could be already destroyed
* or logger was created.
*/ */
if (logger_exists) if (inserted)
{ {
logger.duplicate(); if (should_be_owned_by_shared_ptr_if_created)
} it->second.owned_by_shared_ptr = true;
else if (_pLoggerMap) else
{ it->second.logger->duplicate();
_pLoggerMap->find(name)->second.owned_by_shared_ptr = true;
} }
return makeLoggerPtr(logger); return makeLoggerPtr(*it->second.logger);
} }
Logger& Logger::unsafeGet(const std::string& name) std::pair<Logger::LoggerMapIterator, bool> Logger::unsafeGet(const std::string& name, bool get_shared)
{ {
Logger* pLogger = find(name); std::optional<Logger::LoggerMapIterator> optional_logger_it = find(name);
if (!pLogger)
bool should_recreate_logger = false;
if (optional_logger_it)
{ {
auto & logger_it = *optional_logger_it;
std::optional<size_t> reference_count_before;
if (get_shared)
{
reference_count_before = logger_it->second.logger->duplicate();
}
else if (logger_it->second.owned_by_shared_ptr)
{
reference_count_before = logger_it->second.logger->duplicate();
logger_it->second.owned_by_shared_ptr = false;
}
/// Other thread already decided to delete this logger, but did not yet remove it from map
if (reference_count_before && reference_count_before == 0)
should_recreate_logger = true;
}
if (!optional_logger_it || should_recreate_logger)
{
Logger * logger = nullptr;
if (name == ROOT) if (name == ROOT)
{ {
pLogger = new Logger(name, 0, Message::PRIO_INFORMATION); logger = new Logger(name, nullptr, Message::PRIO_INFORMATION);
} }
else else
{ {
Logger& par = parent(name); Logger& par = parent(name);
pLogger = new Logger(name, par.getChannel(), par.getLevel()); logger = new Logger(name, par.getChannel(), par.getLevel());
} }
add(pLogger);
if (should_recreate_logger)
{
(*optional_logger_it)->second.logger = logger;
return std::make_pair(*optional_logger_it, true);
} }
return *pLogger;
return add(logger);
}
return std::make_pair(*optional_logger_it, false);
}
Logger * Logger::unsafeGetRawPtr(const std::string & name)
{
return unsafeGet(name, false /*get_shared*/).first->second.logger;
} }
@ -415,24 +402,24 @@ Logger& Logger::create(const std::string& name, Channel* pChannel, int level)
{ {
std::lock_guard<std::mutex> lock(getLoggerMutex()); std::lock_guard<std::mutex> lock(getLoggerMutex());
return unsafeCreate(name, pChannel, level); return *unsafeCreate(name, pChannel, level).first->second.logger;
} }
LoggerPtr Logger::createShared(const std::string & name, Channel * pChannel, int level) LoggerPtr Logger::createShared(const std::string & name, Channel * pChannel, int level)
{ {
std::lock_guard<std::mutex> lock(getLoggerMutex()); std::lock_guard<std::mutex> lock(getLoggerMutex());
Logger & logger = unsafeCreate(name, pChannel, level); auto [it, inserted] = unsafeCreate(name, pChannel, level);
_pLoggerMap->find(name)->second.owned_by_shared_ptr = true; it->second.owned_by_shared_ptr = true;
return makeLoggerPtr(logger); return makeLoggerPtr(*it->second.logger);
} }
Logger& Logger::root() Logger& Logger::root()
{ {
std::lock_guard<std::mutex> lock(getLoggerMutex()); std::lock_guard<std::mutex> lock(getLoggerMutex());
return unsafeGet(ROOT); return *unsafeGetRawPtr(ROOT);
} }
@ -440,7 +427,11 @@ Logger* Logger::has(const std::string& name)
{ {
std::lock_guard<std::mutex> lock(getLoggerMutex()); std::lock_guard<std::mutex> lock(getLoggerMutex());
return find(name); auto optional_it = find(name);
if (!optional_it)
return nullptr;
return (*optional_it)->second.logger;
} }
@ -459,20 +450,69 @@ void Logger::shutdown()
} }
delete _pLoggerMap; delete _pLoggerMap;
_pLoggerMap = 0; _pLoggerMap = nullptr;
} }
} }
Logger* Logger::find(const std::string& name) std::optional<Logger::LoggerMapIterator> Logger::find(const std::string& name)
{ {
if (_pLoggerMap) if (_pLoggerMap)
{ {
LoggerMap::iterator it = _pLoggerMap->find(name); LoggerMap::iterator it = _pLoggerMap->find(name);
if (it != _pLoggerMap->end()) if (it != _pLoggerMap->end())
return it->second.logger; return it;
return {};
} }
return 0;
return {};
}
Logger * Logger::findRawPtr(const std::string & name)
{
auto optional_it = find(name);
if (!optional_it)
return nullptr;
return (*optional_it)->second.logger;
}
void intrusive_ptr_add_ref(Logger * ptr)
{
ptr->duplicate();
}
void intrusive_ptr_release(Logger * ptr)
{
size_t reference_count_before = ptr->_counter.fetch_sub(1, std::memory_order_acq_rel);
if (reference_count_before != 1)
return;
{
std::lock_guard<std::mutex> lock(getLoggerMutex());
if (_pLoggerMap)
{
auto it = _pLoggerMap->find(ptr->name());
/** It is possible that during release other thread created logger and
* updated iterator in map.
*/
if (it != _pLoggerMap->end() && ptr == it->second.logger)
{
/** If reference count is 0, this means this intrusive pointer owns logger
* and need destroy it.
*/
assert(it->second.owned_by_shared_ptr);
_pLoggerMap->erase(it);
}
}
}
delete ptr;
} }
@ -490,28 +530,28 @@ void Logger::names(std::vector<std::string>& names)
} }
} }
Logger& Logger::unsafeCreate(const std::string & name, Channel * pChannel, int level)
std::pair<Logger::LoggerMapIterator, bool> Logger::unsafeCreate(const std::string & name, Channel * pChannel, int level)
{ {
if (find(name)) throw ExistsException(); if (find(name)) throw ExistsException();
Logger* pLogger = new Logger(name, pChannel, level); Logger* pLogger = new Logger(name, pChannel, level);
add(pLogger); return add(pLogger);
return *pLogger;
} }
Logger& Logger::parent(const std::string& name) Logger& Logger::parent(const std::string& name)
{ {
std::string::size_type pos = name.rfind('.'); std::string::size_type pos = name.rfind('.');
if (pos != std::string::npos) if (pos != std::string::npos)
{ {
std::string pname = name.substr(0, pos); std::string pname = name.substr(0, pos);
Logger* pParent = find(pname); Logger* pParent = findRawPtr(pname);
if (pParent) if (pParent)
return *pParent; return *pParent;
else else
return parent(pname); return parent(pname);
} }
else return unsafeGet(ROOT); else return *unsafeGetRawPtr(ROOT);
} }
@ -579,12 +619,14 @@ namespace
} }
void Logger::add(Logger* pLogger) std::pair<Logger::LoggerMapIterator, bool> Logger::add(Logger* pLogger)
{ {
if (!_pLoggerMap) if (!_pLoggerMap)
_pLoggerMap = new LoggerMap; _pLoggerMap = new Logger::LoggerMap;
_pLoggerMap->emplace(pLogger->name(), LoggerEntry{pLogger, false /*owned_by_shared_ptr*/}); auto result = _pLoggerMap->emplace(pLogger->name(), LoggerEntry{pLogger, false /*owned_by_shared_ptr*/});
assert(result.second);
return result;
} }

2
contrib/aws vendored

@ -1 +1 @@
Subproject commit ca02358dcc7ce3ab733dd4cbcc32734eecfa4ee3 Subproject commit 4ec215f3607c2111bf2cc91ba842046a6b5eb0c4

2
contrib/aws-c-auth vendored

@ -1 +1 @@
Subproject commit 97133a2b5dbca1ccdf88cd6f44f39d0531d27d12 Subproject commit baeffa791d9d1cf61460662a6d9ac2186aaf05df

2
contrib/aws-c-cal vendored

@ -1 +1 @@
Subproject commit 85dd7664b786a389c6fb1a6f031ab4bb2282133d Subproject commit 9453687ff5493ba94eaccf8851200565c4364c77

@ -1 +1 @@
Subproject commit 45dcb2849c891dba2100b270b4676765c92949ff Subproject commit 80f21b3cac5ac51c6b8a62c7d2a5ef58a75195ee

@ -1 +1 @@
Subproject commit b517b7decd0dac30be2162f5186c250221c53aff Subproject commit 99ec79ee2970f1a045d4ced1501b97ee521f2f85

@ -1 +1 @@
Subproject commit 2f9b60c42f90840ec11822acda3d8cdfa97a773d Subproject commit 08f24e384e5be20bcffa42b49213d24dad7881ae

2
contrib/aws-c-http vendored

@ -1 +1 @@
Subproject commit dd34461987947672444d0bc872c5a733dfdb9711 Subproject commit a082f8a2067e4a31db73f1d4ffd702a8dc0f7089

2
contrib/aws-c-io vendored

@ -1 +1 @@
Subproject commit d58ed4f272b1cb4f89ac9196526ceebe5f2b0d89 Subproject commit 11ce3c750a1dac7b04069fc5bff89e97e91bad4d

2
contrib/aws-c-mqtt vendored

@ -1 +1 @@
Subproject commit 33c3455cec82b16feb940e12006cefd7b3ef4194 Subproject commit 6d36cd3726233cb757468d0ea26f6cd8dad151ec

2
contrib/aws-c-s3 vendored

@ -1 +1 @@
Subproject commit d7bfe602d6925948f1fff95784e3613cca6a3900 Subproject commit de36fee8fe7ab02f10987877ae94a805bf440c1f

@ -1 +1 @@
Subproject commit 208a701fa01e99c7c8cc3dcebc8317da71362972 Subproject commit fd8c0ba2e233997eaaefe82fb818b8b444b956d3

@ -1 +1 @@
Subproject commit ad53be196a25bbefa3700a01187fdce573a7d2d0 Subproject commit 321b805559c8e911be5bddba13fcbd222a3e2d3a

View File

@ -25,6 +25,7 @@ include("${ClickHouse_SOURCE_DIR}/contrib/aws-cmake/AwsFeatureTests.cmake")
include("${ClickHouse_SOURCE_DIR}/contrib/aws-cmake/AwsThreadAffinity.cmake") include("${ClickHouse_SOURCE_DIR}/contrib/aws-cmake/AwsThreadAffinity.cmake")
include("${ClickHouse_SOURCE_DIR}/contrib/aws-cmake/AwsThreadName.cmake") include("${ClickHouse_SOURCE_DIR}/contrib/aws-cmake/AwsThreadName.cmake")
include("${ClickHouse_SOURCE_DIR}/contrib/aws-cmake/AwsSIMD.cmake") include("${ClickHouse_SOURCE_DIR}/contrib/aws-cmake/AwsSIMD.cmake")
include("${ClickHouse_SOURCE_DIR}/contrib/aws-crt-cpp/cmake/AwsGetVersion.cmake")
# Gather sources and options. # Gather sources and options.
@ -35,6 +36,8 @@ set(AWS_PUBLIC_COMPILE_DEFS)
set(AWS_PRIVATE_COMPILE_DEFS) set(AWS_PRIVATE_COMPILE_DEFS)
set(AWS_PRIVATE_LIBS) set(AWS_PRIVATE_LIBS)
list(APPEND AWS_PRIVATE_COMPILE_DEFS "-DINTEL_NO_ITTNOTIFY_API")
if (CMAKE_BUILD_TYPE_UC STREQUAL "DEBUG") if (CMAKE_BUILD_TYPE_UC STREQUAL "DEBUG")
list(APPEND AWS_PRIVATE_COMPILE_DEFS "-DDEBUG_BUILD") list(APPEND AWS_PRIVATE_COMPILE_DEFS "-DDEBUG_BUILD")
endif() endif()
@ -85,14 +88,20 @@ file(GLOB AWS_SDK_CORE_SRC
"${AWS_SDK_CORE_DIR}/source/external/cjson/*.cpp" "${AWS_SDK_CORE_DIR}/source/external/cjson/*.cpp"
"${AWS_SDK_CORE_DIR}/source/external/tinyxml2/*.cpp" "${AWS_SDK_CORE_DIR}/source/external/tinyxml2/*.cpp"
"${AWS_SDK_CORE_DIR}/source/http/*.cpp" "${AWS_SDK_CORE_DIR}/source/http/*.cpp"
"${AWS_SDK_CORE_DIR}/source/http/crt/*.cpp"
"${AWS_SDK_CORE_DIR}/source/http/standard/*.cpp" "${AWS_SDK_CORE_DIR}/source/http/standard/*.cpp"
"${AWS_SDK_CORE_DIR}/source/internal/*.cpp" "${AWS_SDK_CORE_DIR}/source/internal/*.cpp"
"${AWS_SDK_CORE_DIR}/source/monitoring/*.cpp" "${AWS_SDK_CORE_DIR}/source/monitoring/*.cpp"
"${AWS_SDK_CORE_DIR}/source/net/*.cpp"
"${AWS_SDK_CORE_DIR}/source/net/linux-shared/*.cpp"
"${AWS_SDK_CORE_DIR}/source/platform/linux-shared/*.cpp"
"${AWS_SDK_CORE_DIR}/source/smithy/tracing/*.cpp"
"${AWS_SDK_CORE_DIR}/source/utils/*.cpp" "${AWS_SDK_CORE_DIR}/source/utils/*.cpp"
"${AWS_SDK_CORE_DIR}/source/utils/base64/*.cpp" "${AWS_SDK_CORE_DIR}/source/utils/base64/*.cpp"
"${AWS_SDK_CORE_DIR}/source/utils/component-registry/*.cpp"
"${AWS_SDK_CORE_DIR}/source/utils/crypto/*.cpp" "${AWS_SDK_CORE_DIR}/source/utils/crypto/*.cpp"
"${AWS_SDK_CORE_DIR}/source/utils/crypto/openssl/*.cpp"
"${AWS_SDK_CORE_DIR}/source/utils/crypto/factory/*.cpp" "${AWS_SDK_CORE_DIR}/source/utils/crypto/factory/*.cpp"
"${AWS_SDK_CORE_DIR}/source/utils/crypto/openssl/*.cpp"
"${AWS_SDK_CORE_DIR}/source/utils/event/*.cpp" "${AWS_SDK_CORE_DIR}/source/utils/event/*.cpp"
"${AWS_SDK_CORE_DIR}/source/utils/json/*.cpp" "${AWS_SDK_CORE_DIR}/source/utils/json/*.cpp"
"${AWS_SDK_CORE_DIR}/source/utils/logging/*.cpp" "${AWS_SDK_CORE_DIR}/source/utils/logging/*.cpp"
@ -115,9 +124,8 @@ OPTION(USE_AWS_MEMORY_MANAGEMENT "Aws memory management" OFF)
configure_file("${AWS_SDK_CORE_DIR}/include/aws/core/SDKConfig.h.in" configure_file("${AWS_SDK_CORE_DIR}/include/aws/core/SDKConfig.h.in"
"${CMAKE_CURRENT_BINARY_DIR}/include/aws/core/SDKConfig.h" @ONLY) "${CMAKE_CURRENT_BINARY_DIR}/include/aws/core/SDKConfig.h" @ONLY)
list(APPEND AWS_PUBLIC_COMPILE_DEFS "-DAWS_SDK_VERSION_MAJOR=1") aws_get_version(AWS_CRT_CPP_VERSION_MAJOR AWS_CRT_CPP_VERSION_MINOR AWS_CRT_CPP_VERSION_PATCH FULL_VERSION GIT_HASH)
list(APPEND AWS_PUBLIC_COMPILE_DEFS "-DAWS_SDK_VERSION_MINOR=10") configure_file("${AWS_CRT_DIR}/include/aws/crt/Config.h.in" "${AWS_CRT_DIR}/include/aws/crt/Config.h" @ONLY)
list(APPEND AWS_PUBLIC_COMPILE_DEFS "-DAWS_SDK_VERSION_PATCH=36")
list(APPEND AWS_SOURCES ${AWS_SDK_CORE_SRC} ${AWS_SDK_CORE_NET_SRC} ${AWS_SDK_CORE_PLATFORM_SRC}) list(APPEND AWS_SOURCES ${AWS_SDK_CORE_SRC} ${AWS_SDK_CORE_NET_SRC} ${AWS_SDK_CORE_PLATFORM_SRC})
@ -176,6 +184,7 @@ file(GLOB AWS_COMMON_SRC
"${AWS_COMMON_DIR}/source/*.c" "${AWS_COMMON_DIR}/source/*.c"
"${AWS_COMMON_DIR}/source/external/*.c" "${AWS_COMMON_DIR}/source/external/*.c"
"${AWS_COMMON_DIR}/source/posix/*.c" "${AWS_COMMON_DIR}/source/posix/*.c"
"${AWS_COMMON_DIR}/source/linux/*.c"
) )
file(GLOB AWS_COMMON_ARCH_SRC file(GLOB AWS_COMMON_ARCH_SRC

2
contrib/aws-crt-cpp vendored

@ -1 +1 @@
Subproject commit 8a301b7e842f1daed478090c869207300972379f Subproject commit f532d6abc0d2b0d8b5d6fe9e7c51eaedbe4afbd0

2
contrib/aws-s2n-tls vendored

@ -1 +1 @@
Subproject commit 71f4794b7580cf780eb4aca77d69eded5d3c7bb4 Subproject commit 9a1e75454023e952b366ce1eab9c54007250119f

2
contrib/libxml2 vendored

@ -1 +1 @@
Subproject commit 8292f361458fcffe0bff515a385be02e9d35582c Subproject commit 223cb03a5d27b1b2393b266a8657443d046139d6

View File

@ -21,7 +21,7 @@ extern "C" {
* your library and includes mismatch * your library and includes mismatch
*/ */
#ifndef LIBXML2_COMPILING_MSCCDEF #ifndef LIBXML2_COMPILING_MSCCDEF
XMLPUBFUN void xmlCheckVersion(int version); XMLPUBFUN void XMLCALL xmlCheckVersion(int version);
#endif /* LIBXML2_COMPILING_MSCCDEF */ #endif /* LIBXML2_COMPILING_MSCCDEF */
/** /**
@ -29,28 +29,28 @@ XMLPUBFUN void xmlCheckVersion(int version);
* *
* the version string like "1.2.3" * the version string like "1.2.3"
*/ */
#define LIBXML_DOTTED_VERSION "2.12.4" #define LIBXML_DOTTED_VERSION "2.10.3"
/** /**
* LIBXML_VERSION: * LIBXML_VERSION:
* *
* the version number: 1.2.3 value is 10203 * the version number: 1.2.3 value is 10203
*/ */
#define LIBXML_VERSION 21204 #define LIBXML_VERSION 21003
/** /**
* LIBXML_VERSION_STRING: * LIBXML_VERSION_STRING:
* *
* the version number string, 1.2.3 value is "10203" * the version number string, 1.2.3 value is "10203"
*/ */
#define LIBXML_VERSION_STRING "21204" #define LIBXML_VERSION_STRING "21003"
/** /**
* LIBXML_VERSION_EXTRA: * LIBXML_VERSION_EXTRA:
* *
* extra version information, used to show a git commit description * extra version information, used to show a git commit description
*/ */
#define LIBXML_VERSION_EXTRA "-GITv2.12.4" #define LIBXML_VERSION_EXTRA ""
/** /**
* LIBXML_TEST_VERSION: * LIBXML_TEST_VERSION:
@ -58,7 +58,7 @@ XMLPUBFUN void xmlCheckVersion(int version);
* Macro to check that the libxml version in use is compatible with * Macro to check that the libxml version in use is compatible with
* the version the software has been compiled against * the version the software has been compiled against
*/ */
#define LIBXML_TEST_VERSION xmlCheckVersion(21204); #define LIBXML_TEST_VERSION xmlCheckVersion(21003);
#ifndef VMS #ifndef VMS
#if 0 #if 0
@ -270,7 +270,7 @@ XMLPUBFUN void xmlCheckVersion(int version);
* *
* Whether iconv support is available * Whether iconv support is available
*/ */
#if 1 #if 0
#define LIBXML_ICONV_ENABLED #define LIBXML_ICONV_ENABLED
#endif #endif
@ -313,7 +313,7 @@ XMLPUBFUN void xmlCheckVersion(int version);
/** /**
* LIBXML_DEBUG_RUNTIME: * LIBXML_DEBUG_RUNTIME:
* *
* Removed * Whether the runtime debugging is configured in
*/ */
#if 0 #if 0
#define LIBXML_DEBUG_RUNTIME #define LIBXML_DEBUG_RUNTIME
@ -409,7 +409,12 @@ XMLPUBFUN void xmlCheckVersion(int version);
#endif #endif
#ifdef __GNUC__ #ifdef __GNUC__
/** DOC_DISABLE */
/**
* ATTRIBUTE_UNUSED:
*
* Macro used to signal to GCC unused function parameters
*/
#ifndef ATTRIBUTE_UNUSED #ifndef ATTRIBUTE_UNUSED
# if ((__GNUC__ > 2) || ((__GNUC__ == 2) && (__GNUC_MINOR__ >= 7))) # if ((__GNUC__ > 2) || ((__GNUC__ == 2) && (__GNUC_MINOR__ >= 7)))
@ -419,6 +424,12 @@ XMLPUBFUN void xmlCheckVersion(int version);
# endif # endif
#endif #endif
/**
* LIBXML_ATTR_ALLOC_SIZE:
*
* Macro used to indicate to GCC this is an allocator function
*/
#ifndef LIBXML_ATTR_ALLOC_SIZE #ifndef LIBXML_ATTR_ALLOC_SIZE
# if (!defined(__clang__) && ((__GNUC__ > 4) || ((__GNUC__ == 4) && (__GNUC_MINOR__ >= 3)))) # if (!defined(__clang__) && ((__GNUC__ > 4) || ((__GNUC__ == 4) && (__GNUC_MINOR__ >= 3))))
# define LIBXML_ATTR_ALLOC_SIZE(x) __attribute__((alloc_size(x))) # define LIBXML_ATTR_ALLOC_SIZE(x) __attribute__((alloc_size(x)))
@ -429,6 +440,12 @@ XMLPUBFUN void xmlCheckVersion(int version);
# define LIBXML_ATTR_ALLOC_SIZE(x) # define LIBXML_ATTR_ALLOC_SIZE(x)
#endif #endif
/**
* LIBXML_ATTR_FORMAT:
*
* Macro used to indicate to GCC the parameter are printf like
*/
#ifndef LIBXML_ATTR_FORMAT #ifndef LIBXML_ATTR_FORMAT
# if ((__GNUC__ > 3) || ((__GNUC__ == 3) && (__GNUC_MINOR__ >= 3))) # if ((__GNUC__ > 3) || ((__GNUC__ == 3) && (__GNUC_MINOR__ >= 3)))
# define LIBXML_ATTR_FORMAT(fmt,args) __attribute__((__format__(__printf__,fmt,args))) # define LIBXML_ATTR_FORMAT(fmt,args) __attribute__((__format__(__printf__,fmt,args)))
@ -440,69 +457,44 @@ XMLPUBFUN void xmlCheckVersion(int version);
#endif #endif
#ifndef XML_DEPRECATED #ifndef XML_DEPRECATED
# if defined (IN_LIBXML) || (__GNUC__ * 100 + __GNUC_MINOR__ < 301) # ifdef IN_LIBXML
# define XML_DEPRECATED # define XML_DEPRECATED
/* Available since at least GCC 3.1 */
# else # else
/* Available since at least GCC 3.1 */
# define XML_DEPRECATED __attribute__((deprecated)) # define XML_DEPRECATED __attribute__((deprecated))
# endif # endif
#endif #endif
#if defined(__clang__) || (__GNUC__ * 100 + __GNUC_MINOR__ >= 406)
#if defined(__clang__) || (__GNUC__ * 100 + __GNUC_MINOR__ >= 800)
#define XML_IGNORE_FPTR_CAST_WARNINGS \
_Pragma("GCC diagnostic push") \
_Pragma("GCC diagnostic ignored \"-Wpedantic\"") \
_Pragma("GCC diagnostic ignored \"-Wcast-function-type\"")
#else
#define XML_IGNORE_FPTR_CAST_WARNINGS \
_Pragma("GCC diagnostic push") \
_Pragma("GCC diagnostic ignored \"-Wpedantic\"")
#endif
#define XML_POP_WARNINGS \
_Pragma("GCC diagnostic pop")
#else
#define XML_IGNORE_FPTR_CAST_WARNINGS
#define XML_POP_WARNINGS
#endif
#else /* ! __GNUC__ */ #else /* ! __GNUC__ */
/**
* ATTRIBUTE_UNUSED:
*
* Macro used to signal to GCC unused function parameters
*/
#define ATTRIBUTE_UNUSED #define ATTRIBUTE_UNUSED
/**
* LIBXML_ATTR_ALLOC_SIZE:
*
* Macro used to indicate to GCC this is an allocator function
*/
#define LIBXML_ATTR_ALLOC_SIZE(x) #define LIBXML_ATTR_ALLOC_SIZE(x)
/**
* LIBXML_ATTR_FORMAT:
*
* Macro used to indicate to GCC the parameter are printf like
*/
#define LIBXML_ATTR_FORMAT(fmt,args) #define LIBXML_ATTR_FORMAT(fmt,args)
/**
* XML_DEPRECATED:
*
* Macro used to indicate that a function, variable, type or struct member
* is deprecated.
*/
#ifndef XML_DEPRECATED #ifndef XML_DEPRECATED
# if defined (IN_LIBXML) || !defined (_MSC_VER) #define XML_DEPRECATED
# define XML_DEPRECATED
/* Available since Visual Studio 2005 */
# elif defined (_MSC_VER) && (_MSC_VER >= 1400)
# define XML_DEPRECATED __declspec(deprecated)
# endif
#endif
#if defined (_MSC_VER) && (_MSC_VER >= 1400)
# define XML_IGNORE_FPTR_CAST_WARNINGS __pragma(warning(push))
#else
# define XML_IGNORE_FPTR_CAST_WARNINGS
#endif
#ifndef XML_POP_WARNINGS
# if defined (_MSC_VER) && (_MSC_VER >= 1400)
# define XML_POP_WARNINGS __pragma(warning(pop))
# else
# define XML_POP_WARNINGS
# endif
#endif #endif
#endif /* __GNUC__ */ #endif /* __GNUC__ */
#define XML_NO_ATTR
#ifdef LIBXML_THREAD_ENABLED
#define XML_DECLARE_GLOBAL(name, type, attrs) \
attrs XMLPUBFUN type *__##name(void);
#define XML_GLOBAL_MACRO(name) (*__##name())
#else
#define XML_DECLARE_GLOBAL(name, type, attrs) \
attrs XMLPUBVAR type name;
#endif
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif /* __cplusplus */ #endif /* __cplusplus */

View File

@ -24,7 +24,7 @@ git config --file .gitmodules --get-regexp '.*path' | sed 's/[^ ]* //' | xargs -
# We don't want to depend on any third-party CMake files. # We don't want to depend on any third-party CMake files.
# To check it, find and delete them. # To check it, find and delete them.
grep -o -P '"contrib/[^"]+"' .gitmodules | grep -o -P '"contrib/[^"]+"' .gitmodules |
grep -v -P 'contrib/(llvm-project|google-protobuf|grpc|abseil-cpp|corrosion)' | grep -v -P 'contrib/(llvm-project|google-protobuf|grpc|abseil-cpp|corrosion|aws-crt-cpp)' |
xargs -I@ find @ \ xargs -I@ find @ \
-'(' -name 'CMakeLists.txt' -or -name '*.cmake' -')' -and -not -name '*.h.cmake' \ -'(' -name 'CMakeLists.txt' -or -name '*.cmake' -')' -and -not -name '*.h.cmake' \
-delete -delete

View File

@ -34,7 +34,7 @@ RUN arch=${TARGETARCH:-amd64} \
# lts / testing / prestable / etc # lts / testing / prestable / etc
ARG REPO_CHANNEL="stable" ARG REPO_CHANNEL="stable"
ARG REPOSITORY="https://packages.clickhouse.com/tgz/${REPO_CHANNEL}" ARG REPOSITORY="https://packages.clickhouse.com/tgz/${REPO_CHANNEL}"
ARG VERSION="24.1.1.2048" ARG VERSION="24.1.2.5"
ARG PACKAGES="clickhouse-keeper" ARG PACKAGES="clickhouse-keeper"
ARG DIRECT_DOWNLOAD_URLS="" ARG DIRECT_DOWNLOAD_URLS=""

View File

@ -32,7 +32,7 @@ RUN arch=${TARGETARCH:-amd64} \
# lts / testing / prestable / etc # lts / testing / prestable / etc
ARG REPO_CHANNEL="stable" ARG REPO_CHANNEL="stable"
ARG REPOSITORY="https://packages.clickhouse.com/tgz/${REPO_CHANNEL}" ARG REPOSITORY="https://packages.clickhouse.com/tgz/${REPO_CHANNEL}"
ARG VERSION="24.1.1.2048" ARG VERSION="24.1.2.5"
ARG PACKAGES="clickhouse-client clickhouse-server clickhouse-common-static" ARG PACKAGES="clickhouse-client clickhouse-server clickhouse-common-static"
ARG DIRECT_DOWNLOAD_URLS="" ARG DIRECT_DOWNLOAD_URLS=""

View File

@ -30,7 +30,7 @@ RUN sed -i "s|http://archive.ubuntu.com|${apt_archive}|g" /etc/apt/sources.list
ARG REPO_CHANNEL="stable" ARG REPO_CHANNEL="stable"
ARG REPOSITORY="deb [signed-by=/usr/share/keyrings/clickhouse-keyring.gpg] https://packages.clickhouse.com/deb ${REPO_CHANNEL} main" ARG REPOSITORY="deb [signed-by=/usr/share/keyrings/clickhouse-keyring.gpg] https://packages.clickhouse.com/deb ${REPO_CHANNEL} main"
ARG VERSION="24.1.1.2048" ARG VERSION="24.1.2.5"
ARG PACKAGES="clickhouse-client clickhouse-server clickhouse-common-static" ARG PACKAGES="clickhouse-client clickhouse-server clickhouse-common-static"
# set non-empty deb_location_url url to create a docker image # set non-empty deb_location_url url to create a docker image

View File

@ -211,6 +211,17 @@ function build
echo "build_clickhouse_fasttest_binary: [ OK ] $BUILD_SECONDS_ELAPSED sec." \ echo "build_clickhouse_fasttest_binary: [ OK ] $BUILD_SECONDS_ELAPSED sec." \
| ts '%Y-%m-%d %H:%M:%S' \ | ts '%Y-%m-%d %H:%M:%S' \
| tee "$FASTTEST_OUTPUT/test_result.txt" | tee "$FASTTEST_OUTPUT/test_result.txt"
(
# This query should fail, and print stacktrace with proper symbol names (even on a stripped binary)
clickhouse_output=$(programs/clickhouse-stripped --stacktrace -q 'select' 2>&1 || :)
if [[ $clickhouse_output =~ DB::LocalServer::main ]]; then
echo "stripped_clickhouse_shows_symbols_names: [ OK ] 0 sec."
else
echo -e "stripped_clickhouse_shows_symbols_names: [ FAIL ] 0 sec. - clickhouse output:\n\n$clickhouse_output\n"
fi
) | ts '%Y-%m-%d %H:%M:%S' | tee -a "$FASTTEST_OUTPUT/test_result.txt"
if [ "$COPY_CLICKHOUSE_BINARY_TO_OUTPUT" -eq "1" ]; then if [ "$COPY_CLICKHOUSE_BINARY_TO_OUTPUT" -eq "1" ]; then
mkdir -p "$FASTTEST_OUTPUT/binaries/" mkdir -p "$FASTTEST_OUTPUT/binaries/"
cp programs/clickhouse "$FASTTEST_OUTPUT/binaries/clickhouse" cp programs/clickhouse "$FASTTEST_OUTPUT/binaries/clickhouse"

View File

@ -0,0 +1,14 @@
---
sidebar_position: 1
sidebar_label: 2024
---
# 2024 Changelog
### ClickHouse release v24.1.2.5-stable (b2605dd4a5a) FIXME as compared to v24.1.1.2048-stable (5a024dfc093)
#### Bug Fix (user-visible misbehavior in an official stable release)
* Fix translate() with FixedString input [#59356](https://github.com/ClickHouse/ClickHouse/pull/59356) ([Raúl Marín](https://github.com/Algunenano)).
* Fix stacktraces for binaries without debug symbols [#59444](https://github.com/ClickHouse/ClickHouse/pull/59444) ([Azat Khuzhin](https://github.com/azat)).

View File

@ -70,6 +70,19 @@
if (params.has('password')) { password = params.get('password'); } if (params.has('password')) { password = params.get('password'); }
} }
let url = `${host}?allow_introspection_functions=1`;
if (add_http_cors_header) {
url += '&add_http_cors_header=1';
}
if (user) {
url += `&user=${encodeURIComponent(user)}`;
}
if (password) {
url += `&password=${encodeURIComponent(password)}`;
}
let map = L.map('space', { let map = L.map('space', {
crs: L.CRS.Simple, crs: L.CRS.Simple,
center: [-512, 512], center: [-512, 512],
@ -103,24 +116,11 @@
const key = `${coords.z}-${coords.x}-${coords.y}`; const key = `${coords.z}-${coords.x}-${coords.y}`;
let buf = cached_tiles[key]; let buf = cached_tiles[key];
if (!buf) { if (!buf) {
let url = `${host}?default_format=RowBinary&allow_introspection_functions=1`; let request_url = `${url}&default_format=RowBinary` +
`&param_z=${coords.z}&param_x=${coords.x}&param_y=${coords.y}` +
`&enable_http_compression=1&network_compression_method=zstd&network_zstd_compression_level=6`;
if (add_http_cors_header) { const response = await fetch(request_url, { method: 'POST', body: sql });
// For debug purposes, you may set add_http_cors_header from a browser console
url += '&add_http_cors_header=1';
}
if (user) {
url += `&user=${encodeURIComponent(user)}`;
}
if (password) {
url += `&password=${encodeURIComponent(password)}`;
}
url += `&param_z=${coords.z}&param_x=${coords.x}&param_y=${coords.y}`;
url += `&enable_http_compression=1&network_compression_method=zstd&network_zstd_compression_level=6`;
const response = await fetch(url, { method: 'POST', body: sql });
if (!response.ok) { if (!response.ok) {
const text = await response.text(); const text = await response.text();
@ -238,7 +238,7 @@
const addr_hex = '0x' + addr_int.toString(16); const addr_hex = '0x' + addr_int.toString(16);
const response = fetch( const response = fetch(
`http://localhost:8123/?default_format=JSON`, `${url}&default_format=JSON`,
{ {
method: 'POST', method: 'POST',
body: `SELECT encodeXMLComponent(demangle(addressToSymbol(${addr_int}::UInt64))) AS name, body: `SELECT encodeXMLComponent(demangle(addressToSymbol(${addr_int}::UInt64))) AS name,

View File

@ -1214,7 +1214,7 @@ private:
static void expandGroupByAll(QueryNode & query_tree_node_typed); static void expandGroupByAll(QueryNode & query_tree_node_typed);
static void expandOrderByAll(QueryNode & query_tree_node_typed, const Settings & settings); void expandOrderByAll(QueryNode & query_tree_node_typed, const Settings & settings);
static std::string static std::string
rewriteAggregateFunctionNameIfNeeded(const std::string & aggregate_function_name, NullsAction action, const ContextPtr & context); rewriteAggregateFunctionNameIfNeeded(const std::string & aggregate_function_name, NullsAction action, const ContextPtr & context);
@ -2349,15 +2349,18 @@ void QueryAnalyzer::expandOrderByAll(QueryNode & query_tree_node_typed, const Se
for (auto & node : projection_nodes) for (auto & node : projection_nodes)
{ {
if (auto * identifier_node = node->as<IdentifierNode>(); identifier_node != nullptr) auto resolved_expression_it = resolved_expressions.find(node);
if (Poco::toUpper(identifier_node->getIdentifier().getFullName()) == "ALL" || Poco::toUpper(identifier_node->getAlias()) == "ALL") if (resolved_expression_it != resolved_expressions.end())
throw Exception(ErrorCodes::UNEXPECTED_EXPRESSION, {
"Cannot use ORDER BY ALL to sort a column with name 'all', please disable setting `enable_order_by_all` and try again"); auto projection_names = resolved_expression_it->second;
if (projection_names.size() != 1)
if (auto * function_node = node->as<FunctionNode>(); function_node != nullptr) throw Exception(ErrorCodes::LOGICAL_ERROR,
if (Poco::toUpper(function_node->getAlias()) == "ALL") "Expression nodes list expected 1 projection names. Actual {}",
projection_names.size());
if (Poco::toUpper(projection_names[0]) == "ALL")
throw Exception(ErrorCodes::UNEXPECTED_EXPRESSION, throw Exception(ErrorCodes::UNEXPECTED_EXPRESSION,
"Cannot use ORDER BY ALL to sort a column with name 'all', please disable setting `enable_order_by_all` and try again"); "Cannot use ORDER BY ALL to sort a column with name 'all', please disable setting `enable_order_by_all` and try again");
}
auto sort_node = std::make_shared<SortNode>(node, all_node->getSortDirection(), all_node->getNullsSortDirection()); auto sort_node = std::make_shared<SortNode>(node, all_node->getSortDirection(), all_node->getNullsSortDirection());
list_node->getNodes().push_back(sort_node); list_node->getNodes().push_back(sort_node);
@ -7180,8 +7183,6 @@ void QueryAnalyzer::resolveQuery(const QueryTreeNodePtr & query_node, Identifier
if (query_node_typed.hasHaving() && query_node_typed.isGroupByWithTotals() && is_rollup_or_cube) if (query_node_typed.hasHaving() && query_node_typed.isGroupByWithTotals() && is_rollup_or_cube)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "WITH TOTALS and WITH ROLLUP or CUBE are not supported together in presence of HAVING"); throw Exception(ErrorCodes::NOT_IMPLEMENTED, "WITH TOTALS and WITH ROLLUP or CUBE are not supported together in presence of HAVING");
expandOrderByAll(query_node_typed, settings);
/// Initialize aliases in query node scope /// Initialize aliases in query node scope
QueryExpressionsAliasVisitor visitor(scope); QueryExpressionsAliasVisitor visitor(scope);
@ -7368,6 +7369,7 @@ void QueryAnalyzer::resolveQuery(const QueryTreeNodePtr & query_node, Identifier
if (settings.enable_positional_arguments) if (settings.enable_positional_arguments)
replaceNodesWithPositionalArguments(query_node_typed.getOrderByNode(), query_node_typed.getProjection().getNodes(), scope); replaceNodesWithPositionalArguments(query_node_typed.getOrderByNode(), query_node_typed.getProjection().getNodes(), scope);
expandOrderByAll(query_node_typed, settings);
resolveSortNodeList(query_node_typed.getOrderByNode(), scope); resolveSortNodeList(query_node_typed.getOrderByNode(), scope);
} }

View File

@ -6,7 +6,7 @@
#include <Parsers/ASTBackupQuery.h> #include <Parsers/ASTBackupQuery.h>
#include <Storages/IStorage_fwd.h> #include <Storages/IStorage_fwd.h>
#include <Storages/TableLockHolder.h> #include <Storages/TableLockHolder.h>
#include <Storages/MergeTree/ZooKeeperRetries.h> #include <Common/ZooKeeper/ZooKeeperRetries.h>
#include <filesystem> #include <filesystem>
#include <queue> #include <queue>

View File

@ -1,6 +1,6 @@
#pragma once #pragma once
#include <Storages/MergeTree/ZooKeeperRetries.h> #include <Common/ZooKeeper/ZooKeeperRetries.h>
#include <Common/ZooKeeper/Common.h> #include <Common/ZooKeeper/Common.h>
#include <Common/ZooKeeper/ZooKeeperWithFaultInjection.h> #include <Common/ZooKeeper/ZooKeeperWithFaultInjection.h>

View File

@ -2,6 +2,8 @@
#include <memory> #include <memory>
#include <base/defines.h>
#include <Poco/Channel.h> #include <Poco/Channel.h>
#include <Poco/Logger.h> #include <Poco/Logger.h>
#include <Poco/Message.h> #include <Poco/Message.h>
@ -24,6 +26,16 @@ using LoggerRawPtr = Poco::Logger *;
*/ */
LoggerPtr getLogger(const std::string & name); LoggerPtr getLogger(const std::string & name);
/** Get Logger with specified name. If the Logger does not exists, it is created.
* This overload was added for specific purpose, when logger is constructed from constexpr string.
* Logger is destroyed only during program shutdown.
*/
template <size_t n>
ALWAYS_INLINE LoggerPtr getLogger(const char (&name)[n])
{
return Poco::Logger::getShared(name, false /*should_be_owned_by_shared_ptr_if_created*/);
}
/** Create Logger with specified name, channel and logging level. /** Create Logger with specified name, channel and logging level.
* If Logger already exists, throws exception. * If Logger already exists, throws exception.
* Logger is destroyed, when last shared ptr that refers to Logger with specified name is destroyed. * Logger is destroyed, when last shared ptr that refers to Logger with specified name is destroyed.

View File

@ -317,16 +317,19 @@ constexpr std::pair<std::string_view, std::string_view> replacements[]
// Demangle @c symbol_name if it's not from __functional header (as such functions don't provide any useful // Demangle @c symbol_name if it's not from __functional header (as such functions don't provide any useful
// information but pollute stack traces). // information but pollute stack traces).
// Replace parts from @c replacements with shorter aliases // Replace parts from @c replacements with shorter aliases
String demangleAndCollapseNames(std::string_view file, const char * const symbol_name) String demangleAndCollapseNames(std::optional<std::string_view> file, const char * const symbol_name)
{ {
if (!symbol_name) if (!symbol_name)
return "?"; return "?";
std::string_view file_copy = file; if (file.has_value())
if (auto trim_pos = file.find_last_of('/'); trim_pos != file.npos) {
file_copy.remove_suffix(file.size() - trim_pos); std::string_view file_copy = file.value();
if (auto trim_pos = file_copy.find_last_of('/'); trim_pos != file_copy.npos)
file_copy.remove_suffix(file_copy.size() - trim_pos);
if (file_copy.ends_with("functional")) if (file_copy.ends_with("functional"))
return "?"; return "?";
}
String haystack = demangle(symbol_name); String haystack = demangle(symbol_name);
@ -393,8 +396,8 @@ toStringEveryLineImpl([[maybe_unused]] bool fatal, const StackTraceRefTriple & s
if (frame.file.has_value() && frame.line.has_value()) if (frame.file.has_value() && frame.line.has_value())
out << *frame.file << ':' << *frame.line << ": "; out << *frame.file << ':' << *frame.line << ": ";
if (frame.symbol.has_value() && frame.file.has_value()) if (frame.symbol.has_value())
out << demangleAndCollapseNames(*frame.file, frame.symbol->data()); out << demangleAndCollapseNames(frame.file, frame.symbol->data());
else else
out << "?"; out << "?";

View File

@ -9,6 +9,7 @@
#include <Poco/NullChannel.h> #include <Poco/NullChannel.h>
#include <Poco/StreamChannel.h> #include <Poco/StreamChannel.h>
#include <sstream> #include <sstream>
#include <thread>
TEST(Logger, Log) TEST(Logger, Log)
@ -100,3 +101,75 @@ TEST(Logger, SideEffects)
LOG_TRACE(log, "test no throw {}", getLogMessageParamOrThrow()); LOG_TRACE(log, "test no throw {}", getLogMessageParamOrThrow());
} }
TEST(Logger, SharedRawLogger)
{
{
std::ostringstream stream; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
auto stream_channel = Poco::AutoPtr<Poco::StreamChannel>(new Poco::StreamChannel(stream));
auto shared_logger = getLogger("Logger_1");
shared_logger->setChannel(stream_channel.get());
shared_logger->setLevel("trace");
LOG_TRACE(shared_logger, "SharedLogger1Log1");
LOG_TRACE(getRawLogger("Logger_1"), "RawLogger1Log");
LOG_TRACE(shared_logger, "SharedLogger1Log2");
auto actual = stream.str();
EXPECT_EQ(actual, "SharedLogger1Log1\nRawLogger1Log\nSharedLogger1Log2\n");
}
{
std::ostringstream stream; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
auto stream_channel = Poco::AutoPtr<Poco::StreamChannel>(new Poco::StreamChannel(stream));
auto * raw_logger = getRawLogger("Logger_2");
raw_logger->setChannel(stream_channel.get());
raw_logger->setLevel("trace");
LOG_TRACE(getLogger("Logger_2"), "SharedLogger2Log1");
LOG_TRACE(raw_logger, "RawLogger2Log");
LOG_TRACE(getLogger("Logger_2"), "SharedLogger2Log2");
auto actual = stream.str();
EXPECT_EQ(actual, "SharedLogger2Log1\nRawLogger2Log\nSharedLogger2Log2\n");
}
}
TEST(Logger, SharedLoggersThreadSafety)
{
static size_t threads_count = std::thread::hardware_concurrency();
static constexpr size_t loggers_count = 10;
static constexpr size_t logger_get_count = 1000;
Poco::Logger::root();
std::vector<std::string> names;
Poco::Logger::names(names);
size_t loggers_size_before = names.size();
std::vector<std::thread> threads;
for (size_t thread_index = 0; thread_index < threads_count; ++thread_index)
{
threads.emplace_back([]()
{
for (size_t logger_index = 0; logger_index < loggers_count; ++logger_index)
{
for (size_t iteration = 0; iteration < logger_get_count; ++iteration)
{
getLogger("Logger_" + std::to_string(logger_index));
}
}
});
}
for (auto & thread : threads)
thread.join();
Poco::Logger::names(names);
size_t loggers_size_after = names.size();
EXPECT_EQ(loggers_size_before, loggers_size_after);
}

View File

@ -102,7 +102,7 @@ void checkS3Capabilities(
if (s3_capabilities.support_batch_delete && !checkBatchRemove(storage, key_with_trailing_slash)) if (s3_capabilities.support_batch_delete && !checkBatchRemove(storage, key_with_trailing_slash))
{ {
LOG_WARNING( LOG_WARNING(
&Poco::Logger::get("S3ObjectStorage"), getLogger("S3ObjectStorage"),
"Storage for disk {} does not support batch delete operations, " "Storage for disk {} does not support batch delete operations, "
"so `s3_capabilities.support_batch_delete` was automatically turned off during the access check. " "so `s3_capabilities.support_batch_delete` was automatically turned off during the access check. "
"To remove this message set `s3_capabilities.support_batch_delete` for the disk to `false`.", "To remove this message set `s3_capabilities.support_batch_delete` for the disk to `false`.",

View File

@ -82,7 +82,7 @@ WebObjectStorage::loadFiles(const String & path, const std::unique_lock<std::sha
if (!inserted) if (!inserted)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Loading data for {} more than once", file_path); throw Exception(ErrorCodes::LOGICAL_ERROR, "Loading data for {} more than once", file_path);
LOG_TRACE(&Poco::Logger::get("DiskWeb"), "Adding file: {}, size: {}", file_path, size); LOG_TRACE(getLogger("DiskWeb"), "Adding file: {}, size: {}", file_path, size);
loaded_files.emplace_back(file_path); loaded_files.emplace_back(file_path);
} }

View File

@ -267,8 +267,10 @@ private:
/// We don't support WITH cte as (subquery) Select table JOIN cte because we don't do conversion in AST /// We don't support WITH cte as (subquery) Select table JOIN cte because we don't do conversion in AST
bool is_subquery = false; bool is_subquery = false;
if (const auto * ast_table_expr = table_elem.table_expression->as<ASTTableExpression>()) if (const auto * ast_table_expr = table_elem.table_expression->as<ASTTableExpression>())
is_subquery = ast_table_expr->subquery->as<ASTSubquery>() != nullptr {
is_subquery = ast_table_expr->subquery && ast_table_expr->subquery->as<ASTSubquery>() != nullptr
&& ast_table_expr->subquery->as<ASTSubquery>()->cte_name.empty(); && ast_table_expr->subquery->as<ASTSubquery>()->cte_name.empty();
}
else if (table_elem.table_expression->as<ASTSubquery>()) else if (table_elem.table_expression->as<ASTSubquery>())
is_subquery = true; is_subquery = true;

View File

@ -5,7 +5,7 @@
#include <Processors/ISource.h> #include <Processors/ISource.h>
#include <Interpreters/Context_fwd.h> #include <Interpreters/Context_fwd.h>
#include <Parsers/IAST_fwd.h> #include <Parsers/IAST_fwd.h>
#include <Storages/MergeTree/ZooKeeperRetries.h> #include <Common/ZooKeeper/ZooKeeperRetries.h>
namespace zkutil namespace zkutil

View File

@ -20,7 +20,6 @@
#include <Disks/ObjectStorages/DiskObjectStorage.h> #include <Disks/ObjectStorages/DiskObjectStorage.h>
#include <Disks/TemporaryFileOnDisk.h> #include <Disks/TemporaryFileOnDisk.h>
#include <Disks/createVolume.h> #include <Disks/createVolume.h>
#include <Functions/IFunction.h>
#include <IO/Operators.h> #include <IO/Operators.h>
#include <IO/S3Common.h> #include <IO/S3Common.h>
#include <IO/SharedThreadPools.h> #include <IO/SharedThreadPools.h>
@ -47,7 +46,6 @@
#include <Parsers/ASTPartition.h> #include <Parsers/ASTPartition.h>
#include <Parsers/ASTSetQuery.h> #include <Parsers/ASTSetQuery.h>
#include <Parsers/ASTTablesInSelectQuery.h> #include <Parsers/ASTTablesInSelectQuery.h>
#include <Parsers/ExpressionListParsers.h>
#include <Parsers/parseQuery.h> #include <Parsers/parseQuery.h>
#include <Parsers/queryToString.h> #include <Parsers/queryToString.h>
#include <Processors/Formats/IInputFormat.h> #include <Processors/Formats/IInputFormat.h>
@ -62,9 +60,7 @@
#include <Storages/MergeTree/MergeTreeDataPartCloner.h> #include <Storages/MergeTree/MergeTreeDataPartCloner.h>
#include <Storages/MergeTree/MergeTreeDataPartCompact.h> #include <Storages/MergeTree/MergeTreeDataPartCompact.h>
#include <Storages/MergeTree/MergeTreeDataPartInMemory.h> #include <Storages/MergeTree/MergeTreeDataPartInMemory.h>
#include <Storages/MergeTree/MergeTreeDataPartWide.h>
#include <Storages/Statistics/Estimator.h> #include <Storages/Statistics/Estimator.h>
#include <Storages/MergeTree/MergeTreeSelectProcessor.h>
#include <Storages/MergeTree/RangesInDataPart.h> #include <Storages/MergeTree/RangesInDataPart.h>
#include <Storages/MergeTree/checkDataPart.h> #include <Storages/MergeTree/checkDataPart.h>
#include <Storages/MutationCommands.h> #include <Storages/MutationCommands.h>
@ -75,12 +71,10 @@
#include <Common/CurrentMetrics.h> #include <Common/CurrentMetrics.h>
#include <Common/Increment.h> #include <Common/Increment.h>
#include <Common/ProfileEventsScope.h> #include <Common/ProfileEventsScope.h>
#include <Common/SimpleIncrement.h>
#include <Common/Stopwatch.h> #include <Common/Stopwatch.h>
#include <Common/StringUtils/StringUtils.h> #include <Common/StringUtils/StringUtils.h>
#include <Common/ThreadFuzzer.h> #include <Common/ThreadFuzzer.h>
#include <Common/escapeForFileName.h> #include <Common/escapeForFileName.h>
#include <Common/getNumberOfPhysicalCPUCores.h>
#include <Common/noexcept_scope.h> #include <Common/noexcept_scope.h>
#include <Common/quoteString.h> #include <Common/quoteString.h>
#include <Common/scope_guard_safe.h> #include <Common/scope_guard_safe.h>
@ -91,13 +85,10 @@
#include <base/insertAtEnd.h> #include <base/insertAtEnd.h>
#include <base/interpolate.h> #include <base/interpolate.h>
#include <base/defines.h>
#include <algorithm> #include <algorithm>
#include <atomic> #include <atomic>
#include <cmath>
#include <chrono> #include <chrono>
#include <iomanip>
#include <limits> #include <limits>
#include <optional> #include <optional>
#include <ranges> #include <ranges>

View File

@ -180,7 +180,7 @@ std::pair<MergeTreeData::MutableDataPartPtr, scope_guard> cloneSourcePart(
} }
LOG_DEBUG( LOG_DEBUG(
&Poco::Logger::get("MergeTreeDataPartCloner"), getLogger("MergeTreeDataPartCloner"),
"Clone {} part {} to {}{}", "Clone {} part {} to {}{}",
src_flushed_tmp_part ? "flushed" : "", src_flushed_tmp_part ? "flushed" : "",
src_part_storage->getFullPath(), src_part_storage->getFullPath(),

View File

@ -3,7 +3,7 @@
#include <Processors/Sinks/SinkToStorage.h> #include <Processors/Sinks/SinkToStorage.h>
#include <Storages/MergeTree/MergeTreeData.h> #include <Storages/MergeTree/MergeTreeData.h>
#include <base/types.h> #include <base/types.h>
#include <Storages/MergeTree/ZooKeeperRetries.h> #include <Common/ZooKeeper/ZooKeeperRetries.h>
#include <Common/ZooKeeper/ZooKeeperWithFaultInjection.h> #include <Common/ZooKeeper/ZooKeeperWithFaultInjection.h>
#include <Storages/MergeTree/AsyncBlockIDsCache.h> #include <Storages/MergeTree/AsyncBlockIDsCache.h>

View File

@ -637,25 +637,31 @@ void S3QueueFilesMetadata::setFileProcessedForUnorderedMode(ProcessingNodeHolder
"this could be a result of expired zookeeper session", path); "this could be a result of expired zookeeper session", path);
} }
void S3QueueFilesMetadata::setFileProcessedForOrderedMode(ProcessingNodeHolderPtr holder) void S3QueueFilesMetadata::setFileProcessedForOrderedMode(ProcessingNodeHolderPtr holder)
{
auto processed_node_path = isShardedProcessing()
? zookeeper_processed_path / toString(getProcessingIdForPath(holder->path))
: zookeeper_processed_path;
return setFileProcessedForOrderedModeImpl(holder->path, holder, processed_node_path);
}
void S3QueueFilesMetadata::setFileProcessedForOrderedModeImpl(
const std::string & path, ProcessingNodeHolderPtr holder, const std::string & processed_node_path)
{ {
/// Update a persistent node in /processed and remove ephemeral node from /processing. /// Update a persistent node in /processed and remove ephemeral node from /processing.
const auto & path = holder->path;
const auto node_name = getNodeName(path); const auto node_name = getNodeName(path);
const auto node_metadata = createNodeMetadata(path).toString(); const auto node_metadata = createNodeMetadata(path).toString();
const auto zk_client = getZooKeeper(); const auto zk_client = getZooKeeper();
auto processed_node = isShardedProcessing() LOG_TEST(log, "Setting file `{}` as processed (at {})", path, processed_node_path);
? zookeeper_processed_path / toString(getProcessingIdForPath(path))
: zookeeper_processed_path;
LOG_TEST(log, "Setting file `{}` as processed", path);
while (true) while (true)
{ {
std::string res; std::string res;
Coordination::Stat stat; Coordination::Stat stat;
bool exists = zk_client->tryGet(processed_node, res, &stat); bool exists = zk_client->tryGet(processed_node_path, res, &stat);
Coordination::Requests requests; Coordination::Requests requests;
if (exists) if (exists)
{ {
@ -664,28 +670,20 @@ void S3QueueFilesMetadata::setFileProcessedForOrderedMode(ProcessingNodeHolderPt
auto metadata = NodeMetadata::fromString(res); auto metadata = NodeMetadata::fromString(res);
if (metadata.file_path >= path) if (metadata.file_path >= path)
{ {
/// Here we get in the case that maximum processed file is bigger than ours. LOG_TRACE(log, "File {} is already processed, current max processed file: {}", path, metadata.file_path);
/// This is possible to achieve in case of parallel processing
/// but for local processing we explicitly disable parallel mode and do everything in a single thread
/// (see constructor of StorageS3Queue where s3queue_processing_threads_num is explicitly set to 1 in case of Ordered mode).
/// Nevertheless, in case of distributed processing we cannot do anything with parallelism.
/// What this means?
/// It means that in scenario "distributed processing + Ordered mode"
/// a setting s3queue_loading_retries will not work. It is possible to fix, it is in TODO.
/// Return because there is nothing to change,
/// the max processed file is already bigger than ours.
return; return;
} }
} }
requests.push_back(zkutil::makeSetRequest(processed_node, node_metadata, stat.version)); requests.push_back(zkutil::makeSetRequest(processed_node_path, node_metadata, stat.version));
} }
else else
{ {
requests.push_back(zkutil::makeCreateRequest(processed_node, node_metadata, zkutil::CreateMode::Persistent)); requests.push_back(zkutil::makeCreateRequest(processed_node_path, node_metadata, zkutil::CreateMode::Persistent));
} }
Coordination::Responses responses; Coordination::Responses responses;
if (holder)
{
if (holder->remove(&requests, &responses)) if (holder->remove(&requests, &responses))
{ {
LOG_TEST(log, "Moved file `{}` to processed", path); LOG_TEST(log, "Moved file `{}` to processed", path);
@ -693,10 +691,20 @@ void S3QueueFilesMetadata::setFileProcessedForOrderedMode(ProcessingNodeHolderPt
zk_client->tryRemove(zookeeper_failed_path / (node_name + ".retriable"), -1); zk_client->tryRemove(zookeeper_failed_path / (node_name + ".retriable"), -1);
return; return;
} }
}
else
{
auto code = zk_client->tryMulti(requests, responses);
if (code == Coordination::Error::ZOK)
return;
}
/// Failed to update max processed node, retry. /// Failed to update max processed node, retry.
if (!responses.empty() && responses[0]->error != Coordination::Error::ZOK) if (!responses.empty() && responses[0]->error != Coordination::Error::ZOK)
{
LOG_TRACE(log, "Failed to update processed node ({}). Will retry.", magic_enum::enum_name(responses[0]->error));
continue; continue;
}
LOG_WARNING(log, "Cannot set file ({}) as processed since processing node " LOG_WARNING(log, "Cannot set file ({}) as processed since processing node "
"does not exist with expected processing id does not exist, " "does not exist with expected processing id does not exist, "
@ -705,6 +713,22 @@ void S3QueueFilesMetadata::setFileProcessedForOrderedMode(ProcessingNodeHolderPt
} }
} }
void S3QueueFilesMetadata::setFileProcessed(const std::string & path, size_t shard_id)
{
if (mode != S3QueueMode::ORDERED)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Can set file as preprocessed only for Ordered mode");
if (isShardedProcessing())
{
for (const auto & processor : getProcessingIdsForShard(shard_id))
setFileProcessedForOrderedModeImpl(path, nullptr, zookeeper_processed_path / toString(processor));
}
else
{
setFileProcessedForOrderedModeImpl(path, nullptr, zookeeper_processed_path);
}
}
void S3QueueFilesMetadata::setFileFailed(ProcessingNodeHolderPtr holder, const String & exception_message) void S3QueueFilesMetadata::setFileFailed(ProcessingNodeHolderPtr holder, const String & exception_message)
{ {
auto timer = DB::CurrentThread::getProfileEvents().timer(ProfileEvents::S3QueueSetFileFailedMicroseconds); auto timer = DB::CurrentThread::getProfileEvents().timer(ProfileEvents::S3QueueSetFileFailedMicroseconds);

View File

@ -42,6 +42,7 @@ public:
~S3QueueFilesMetadata(); ~S3QueueFilesMetadata();
void setFileProcessed(ProcessingNodeHolderPtr holder); void setFileProcessed(ProcessingNodeHolderPtr holder);
void setFileProcessed(const std::string & path, size_t shard_id);
void setFileFailed(ProcessingNodeHolderPtr holder, const std::string & exception_message); void setFileFailed(ProcessingNodeHolderPtr holder, const std::string & exception_message);
@ -141,6 +142,9 @@ private:
void setFileProcessedForUnorderedMode(ProcessingNodeHolderPtr holder); void setFileProcessedForUnorderedMode(ProcessingNodeHolderPtr holder);
std::string getZooKeeperPathForShard(size_t shard_id) const; std::string getZooKeeperPathForShard(size_t shard_id) const;
void setFileProcessedForOrderedModeImpl(
const std::string & path, ProcessingNodeHolderPtr holder, const std::string & processed_node_path);
enum class SetFileProcessingResult enum class SetFileProcessingResult
{ {
Success, Success,

View File

@ -22,6 +22,7 @@ class ASTStorage;
M(UInt32, s3queue_loading_retries, 0, "Retry loading up to specified number of times", 0) \ M(UInt32, s3queue_loading_retries, 0, "Retry loading up to specified number of times", 0) \
M(UInt32, s3queue_processing_threads_num, 1, "Number of processing threads", 0) \ M(UInt32, s3queue_processing_threads_num, 1, "Number of processing threads", 0) \
M(UInt32, s3queue_enable_logging_to_s3queue_log, 1, "Enable logging to system table system.s3queue_log", 0) \ M(UInt32, s3queue_enable_logging_to_s3queue_log, 1, "Enable logging to system table system.s3queue_log", 0) \
M(String, s3queue_last_processed_path, "", "For Ordered mode. Files that have lexicographically smaller file name are considered already processed", 0) \
M(UInt32, s3queue_tracked_file_ttl_sec, 0, "Maximum number of seconds to store processed files in ZooKeeper node (store forever by default)", 0) \ M(UInt32, s3queue_tracked_file_ttl_sec, 0, "Maximum number of seconds to store processed files in ZooKeeper node (store forever by default)", 0) \
M(UInt32, s3queue_polling_min_timeout_ms, 1000, "Minimal timeout before next polling", 0) \ M(UInt32, s3queue_polling_min_timeout_ms, 1000, "Minimal timeout before next polling", 0) \
M(UInt32, s3queue_polling_max_timeout_ms, 10000, "Maximum timeout before next polling", 0) \ M(UInt32, s3queue_polling_max_timeout_ms, 10000, "Maximum timeout before next polling", 0) \

View File

@ -155,20 +155,20 @@ StorageS3Queue::StorageS3Queue(
LOG_INFO(log, "Using zookeeper path: {}", zk_path.string()); LOG_INFO(log, "Using zookeeper path: {}", zk_path.string());
task = getContext()->getSchedulePool().createTask("S3QueueStreamingTask", [this] { threadFunc(); }); task = getContext()->getSchedulePool().createTask("S3QueueStreamingTask", [this] { threadFunc(); });
/// Get metadata manager from S3QueueMetadataFactory,
/// it will increase the ref count for the metadata object.
/// The ref count is decreased when StorageS3Queue::drop() method is called.
files_metadata = S3QueueMetadataFactory::instance().getOrCreate(zk_path, *s3queue_settings);
try try
{ {
createOrCheckMetadata(storage_metadata); createOrCheckMetadata(storage_metadata);
} }
catch (...) catch (...)
{ {
S3QueueMetadataFactory::instance().remove(zk_path);
throw; throw;
} }
/// Get metadata manager from S3QueueMetadataFactory,
/// it will increase the ref count for the metadata object.
/// The ref count is decreased when StorageS3Queue::drop() method is called.
files_metadata = S3QueueMetadataFactory::instance().getOrCreate(zk_path, *s3queue_settings);
if (files_metadata->isShardedProcessing()) if (files_metadata->isShardedProcessing())
{ {
if (!s3queue_settings->s3queue_current_shard_num.changed) if (!s3queue_settings->s3queue_current_shard_num.changed)
@ -181,6 +181,10 @@ StorageS3Queue::StorageS3Queue(
files_metadata->registerNewShard(s3queue_settings->s3queue_current_shard_num); files_metadata->registerNewShard(s3queue_settings->s3queue_current_shard_num);
} }
} }
if (s3queue_settings->mode == S3QueueMode::ORDERED && !s3queue_settings->s3queue_last_processed_path.value.empty())
{
files_metadata->setFileProcessed(s3queue_settings->s3queue_last_processed_path.value, s3queue_settings->s3queue_current_shard_num);
}
} }
void StorageS3Queue::startup() void StorageS3Queue::startup()

View File

@ -1,6 +1,5 @@
#include <Core/Defines.h> #include <Core/Defines.h>
#include <cstddef>
#include <ranges> #include <ranges>
#include <chrono> #include <chrono>
@ -29,17 +28,14 @@
#include <Storages/AlterCommands.h> #include <Storages/AlterCommands.h>
#include <Storages/ColumnsDescription.h> #include <Storages/ColumnsDescription.h>
#include <Storages/Freeze.h> #include <Storages/Freeze.h>
#include <Storages/MergeTree/AsyncBlockIDsCache.h>
#include <Storages/MergeTree/DataPartStorageOnDiskFull.h> #include <Storages/MergeTree/DataPartStorageOnDiskFull.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h> #include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Storages/MergeTree/LeaderElection.h> #include <Storages/MergeTree/LeaderElection.h>
#include <Storages/MergeTree/MergeFromLogEntryTask.h> #include <Storages/MergeTree/MergeFromLogEntryTask.h>
#include <Storages/MergeTree/MergeList.h>
#include <Storages/MergeTree/MergeTreeBackgroundExecutor.h> #include <Storages/MergeTree/MergeTreeBackgroundExecutor.h>
#include <Storages/MergeTree/MergeTreeDataFormatVersion.h> #include <Storages/MergeTree/MergeTreeDataFormatVersion.h>
#include <Storages/MergeTree/MergeTreePartInfo.h> #include <Storages/MergeTree/MergeTreePartInfo.h>
#include <Storages/MergeTree/MergeTreePartitionCompatibilityVerifier.h> #include <Storages/MergeTree/MergeTreePartitionCompatibilityVerifier.h>
#include <Storages/MergeTree/MergeTreeReaderCompact.h>
#include <Storages/MergeTree/MergedBlockOutputStream.h> #include <Storages/MergeTree/MergedBlockOutputStream.h>
#include <Storages/MergeTree/MutateFromLogEntryTask.h> #include <Storages/MergeTree/MutateFromLogEntryTask.h>
#include <Storages/MergeTree/PinnedPartUUIDs.h> #include <Storages/MergeTree/PinnedPartUUIDs.h>
@ -64,21 +60,16 @@
#include <Parsers/formatAST.h> #include <Parsers/formatAST.h>
#include <Parsers/parseQuery.h> #include <Parsers/parseQuery.h>
#include <Parsers/ASTInsertQuery.h> #include <Parsers/ASTInsertQuery.h>
#include <Parsers/ASTDropQuery.h>
#include <Parsers/ASTFunction.h> #include <Parsers/ASTFunction.h>
#include <Parsers/ASTOptimizeQuery.h>
#include <Parsers/ASTPartition.h> #include <Parsers/ASTPartition.h>
#include <Parsers/ASTLiteral.h> #include <Parsers/ASTLiteral.h>
#include <Parsers/ASTSelectWithUnionQuery.h> #include <Parsers/ASTSelectWithUnionQuery.h>
#include <Parsers/queryToString.h> #include <Parsers/queryToString.h>
#include <Parsers/ASTCheckQuery.h>
#include <Parsers/ExpressionListParsers.h>
#include <Processors/QueryPlan/QueryPlan.h> #include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/Sources/RemoteSource.h> #include <Processors/Sources/RemoteSource.h>
#include <Processors/QueryPlan/BuildQueryPipelineSettings.h> #include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h> #include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
#include <Processors/QueryPlan/ReadFromPreparedSource.h>
#include <Processors/Sinks/EmptySink.h> #include <Processors/Sinks/EmptySink.h>
#include <Planner/Utils.h> #include <Planner/Utils.h>
@ -106,9 +97,6 @@
#include <Backups/IRestoreCoordination.h> #include <Backups/IRestoreCoordination.h>
#include <Backups/RestorerFromBackup.h> #include <Backups/RestorerFromBackup.h>
#include <Poco/DirectoryIterator.h>
#include <base/scope_guard.h>
#include <Common/scope_guard_safe.h> #include <Common/scope_guard_safe.h>
#include <boost/algorithm/string/join.hpp> #include <boost/algorithm/string/join.hpp>

View File

@ -12,6 +12,8 @@
#include <Interpreters/ProcessList.h> #include <Interpreters/ProcessList.h>
#include <Interpreters/evaluateConstantExpression.h> #include <Interpreters/evaluateConstantExpression.h>
#include <Common/ZooKeeper/ZooKeeper.h> #include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/ZooKeeper/ZooKeeperRetries.h>
#include <Common/ZooKeeper/ZooKeeperWithFaultInjection.h>
#include <Common/typeid_cast.h> #include <Common/typeid_cast.h>
#include <Columns/ColumnSet.h> #include <Columns/ColumnSet.h>
#include <Columns/ColumnConst.h> #include <Columns/ColumnConst.h>
@ -426,7 +428,30 @@ void ReadFromSystemZooKeeper::applyFilters()
void ReadFromSystemZooKeeper::fillData(MutableColumns & res_columns) void ReadFromSystemZooKeeper::fillData(MutableColumns & res_columns)
{ {
zkutil::ZooKeeperPtr zookeeper = context->getZooKeeper(); QueryStatusPtr query_status = context->getProcessListElement();
const auto & settings = context->getSettingsRef();
/// Use insert settings for now in order not to introduce new settings.
/// Hopefully insert settings will also be unified and replaced with some generic retry settings.
ZooKeeperRetriesInfo retries_seetings(
settings.insert_keeper_max_retries,
settings.insert_keeper_retry_initial_backoff_ms,
settings.insert_keeper_retry_max_backoff_ms);
ZooKeeperWithFaultInjection::Ptr zookeeper;
/// Handles reconnects when needed
auto get_zookeeper = [&] ()
{
if (!zookeeper || zookeeper->expired())
{
zookeeper = ZooKeeperWithFaultInjection::createInstance(
settings.insert_keeper_fault_injection_probability,
settings.insert_keeper_fault_injection_seed,
context->getZooKeeper(),
"", nullptr);
}
return zookeeper;
};
if (paths.empty()) if (paths.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, throw Exception(ErrorCodes::BAD_ARGUMENTS,
@ -448,6 +473,9 @@ void ReadFromSystemZooKeeper::fillData(MutableColumns & res_columns)
std::unordered_set<String> added; std::unordered_set<String> added;
while (!paths.empty()) while (!paths.empty())
{ {
if (query_status)
query_status->checkTimeLimit();
list_tasks.clear(); list_tasks.clear();
std::vector<String> paths_to_list; std::vector<String> paths_to_list;
while (!paths.empty() && static_cast<Int64>(list_tasks.size()) < max_inflight_requests) while (!paths.empty() && static_cast<Int64>(list_tasks.size()) < max_inflight_requests)
@ -470,7 +498,10 @@ void ReadFromSystemZooKeeper::fillData(MutableColumns & res_columns)
paths_to_list.emplace_back(task.path_corrected); paths_to_list.emplace_back(task.path_corrected);
list_tasks.emplace_back(std::move(task)); list_tasks.emplace_back(std::move(task));
} }
auto list_responses = zookeeper->tryGetChildren(paths_to_list);
zkutil::ZooKeeper::MultiTryGetChildrenResponse list_responses;
ZooKeeperRetriesControl("", nullptr, retries_seetings, query_status).retryLoop(
[&]() { list_responses = get_zookeeper()->tryGetChildren(paths_to_list); });
struct GetTask struct GetTask
{ {
@ -514,7 +545,9 @@ void ReadFromSystemZooKeeper::fillData(MutableColumns & res_columns)
} }
} }
auto get_responses = zookeeper->tryGet(paths_to_get); zkutil::ZooKeeper::MultiTryGetResponse get_responses;
ZooKeeperRetriesControl("", nullptr, retries_seetings, query_status).retryLoop(
[&]() { get_responses = get_zookeeper()->tryGet(paths_to_get); });
for (size_t i = 0, size = get_tasks.size(); i < size; ++i) for (size_t i = 0, size = get_tasks.size(); i < size; ++i)
{ {

View File

@ -99,6 +99,7 @@ def started_cluster():
main_configs=[ main_configs=[
"configs/s3queue_log.xml", "configs/s3queue_log.xml",
], ],
stay_alive=True,
) )
logging.info("Starting cluster...") logging.info("Starting cluster...")
@ -539,10 +540,7 @@ def test_multiple_tables_meta_mismatch(started_cluster):
}, },
) )
except QueryRuntimeException as e: except QueryRuntimeException as e:
assert ( assert "Existing table metadata in ZooKeeper differs in engine mode" in str(e)
"Metadata with the same `s3queue_zookeeper_path` was already created but with different settings"
in str(e)
)
failed = True failed = True
assert failed is True assert failed is True
@ -1283,3 +1281,108 @@ def test_settings_check(started_cluster):
) )
node.query(f"DROP TABLE {table_name} SYNC") node.query(f"DROP TABLE {table_name} SYNC")
@pytest.mark.parametrize("processing_threads", [1, 5])
def test_processed_file_setting(started_cluster, processing_threads):
node = started_cluster.instances["instance"]
table_name = f"test_processed_file_setting_{processing_threads}"
dst_table_name = f"{table_name}_dst"
keeper_path = f"/clickhouse/test_{table_name}"
files_path = f"{table_name}_data"
files_to_generate = 10
create_table(
started_cluster,
node,
table_name,
"ordered",
files_path,
additional_settings={
"keeper_path": keeper_path,
"s3queue_processing_threads_num": processing_threads,
"s3queue_last_processed_path": f"{files_path}/test_5.csv",
},
)
total_values = generate_random_files(
started_cluster, files_path, files_to_generate, start_ind=0, row_num=1
)
create_mv(node, table_name, dst_table_name)
def get_count():
return int(node.query(f"SELECT count() FROM {dst_table_name}"))
expected_rows = 4
for _ in range(20):
if expected_rows == get_count():
break
time.sleep(1)
assert expected_rows == get_count()
node.restart_clickhouse()
time.sleep(10)
expected_rows = 4
for _ in range(20):
if expected_rows == get_count():
break
time.sleep(1)
assert expected_rows == get_count()
@pytest.mark.parametrize("processing_threads", [1, 5])
def test_processed_file_setting_distributed(started_cluster, processing_threads):
node = started_cluster.instances["instance"]
node_2 = started_cluster.instances["instance2"]
table_name = f"test_processed_file_setting_distributed_{processing_threads}"
dst_table_name = f"{table_name}_dst"
keeper_path = f"/clickhouse/test_{table_name}"
files_path = f"{table_name}_data"
files_to_generate = 10
for instance in [node, node_2]:
create_table(
started_cluster,
instance,
table_name,
"ordered",
files_path,
additional_settings={
"keeper_path": keeper_path,
"s3queue_processing_threads_num": processing_threads,
"s3queue_last_processed_path": f"{files_path}/test_5.csv",
"s3queue_total_shards_num": 2,
},
)
total_values = generate_random_files(
started_cluster, files_path, files_to_generate, start_ind=0, row_num=1
)
for instance in [node, node_2]:
create_mv(instance, table_name, dst_table_name)
def get_count():
query = f"SELECT count() FROM {dst_table_name}"
return int(node.query(query)) + int(node_2.query(query))
expected_rows = 4
for _ in range(20):
if expected_rows == get_count():
break
time.sleep(1)
assert expected_rows == get_count()
for instance in [node, node_2]:
instance.restart_clickhouse()
time.sleep(10)
expected_rows = 4
for _ in range(20):
if expected_rows == get_count():
break
time.sleep(1)
assert expected_rows == get_count()

View File

@ -82,3 +82,12 @@ B 3 10
D 1 20 D 1 20
A 2 30 A 2 30
C \N 40 C \N 40
-- test SELECT * ORDER BY ALL with no "all" column in the SELECT clause
A 2 30
B 3 10
C \N 40
D 1 20
A 2 30
B 3 10
C \N 40
D 1 20

View File

@ -87,3 +87,23 @@ SET allow_experimental_analyzer = 1;
SELECT a, b, all FROM order_by_all ORDER BY all, a; SELECT a, b, all FROM order_by_all ORDER BY all, a;
DROP TABLE order_by_all; DROP TABLE order_by_all;
SELECT '-- test SELECT * ORDER BY ALL with no "all" column in the SELECT clause';
CREATE TABLE order_by_all
(
a String,
b Nullable(Int32),
c UInt64,
)
ENGINE = Memory;
INSERT INTO order_by_all VALUES ('B', 3, 10), ('C', NULL, 40), ('D', 1, 20), ('A', 2, 30);
SET allow_experimental_analyzer = 0;
SELECT * FROM order_by_all ORDER BY ALL;
SET allow_experimental_analyzer = 1;
SELECT * FROM order_by_all ORDER BY ALL;
DROP TABLE order_by_all;

View File

@ -1,2 +1,3 @@
990000 990000
990000 990000
10

View File

@ -19,5 +19,9 @@ WITH filtered_groups AS (SELECT a FROM pr_1 WHERE a >= 10000)
SELECT count() FROM pr_2 INNER JOIN filtered_groups ON pr_2.a = filtered_groups.a SELECT count() FROM pr_2 INNER JOIN filtered_groups ON pr_2.a = filtered_groups.a
SETTINGS allow_experimental_analyzer = 0, allow_experimental_parallel_reading_from_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, cluster_for_parallel_replicas = 'test_cluster_two_shards', max_parallel_replicas = 3; -- { serverError SUPPORT_IS_DISABLED } SETTINGS allow_experimental_analyzer = 0, allow_experimental_parallel_reading_from_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, cluster_for_parallel_replicas = 'test_cluster_two_shards', max_parallel_replicas = 3; -- { serverError SUPPORT_IS_DISABLED }
-- Sanitizer
SELECT count() FROM pr_2 JOIN numbers(10) as pr_1 ON pr_2.a = pr_1.number
SETTINGS allow_experimental_parallel_reading_from_replicas = 1, parallel_replicas_for_non_replicated_merge_tree = 1, cluster_for_parallel_replicas = 'test_cluster_two_shards', max_parallel_replicas = 3;
DROP TABLE IF EXISTS pr_1; DROP TABLE IF EXISTS pr_1;
DROP TABLE IF EXISTS pr_2; DROP TABLE IF EXISTS pr_2;

View File

@ -0,0 +1,3 @@
/keeper api_version
/keeper feature_flags
1

View File

@ -0,0 +1,22 @@
-- Tags: zookeeper, no-parallel, no-fasttest
SELECT path, name
FROM system.zookeeper
WHERE path = '/keeper'
ORDER BY path, name
SETTINGS
insert_keeper_retry_initial_backoff_ms = 1,
insert_keeper_retry_max_backoff_ms = 20,
insert_keeper_fault_injection_probability=0.3,
insert_keeper_fault_injection_seed=4,
log_comment='02975_system_zookeeper_retries';
SYSTEM FLUSH LOGS;
-- Check that there where zk session failures
SELECT ProfileEvents['ZooKeeperHardwareExceptions'] > 0
FROM system.query_log
WHERE current_database = currentDatabase() AND type = 'QueryFinish' AND log_comment='02975_system_zookeeper_retries'
ORDER BY event_time_microseconds DESC
LIMIT 1;

View File

@ -59,7 +59,7 @@ int main(int argc, char *argv[])
Poco::Logger::root().setChannel(channel); Poco::Logger::root().setChannel(channel);
Poco::Logger::root().setLevel("trace"); Poco::Logger::root().setLevel("trace");
} }
auto * logger = &Poco::Logger::get("keeper-dumper"); auto logger = getLogger("keeper-dumper");
ResponsesQueue queue(std::numeric_limits<size_t>::max()); ResponsesQueue queue(std::numeric_limits<size_t>::max());
SnapshotsQueue snapshots_queue{1}; SnapshotsQueue snapshots_queue{1};
CoordinationSettingsPtr settings = std::make_shared<CoordinationSettings>(); CoordinationSettingsPtr settings = std::make_shared<CoordinationSettings>();

View File

@ -1,3 +1,4 @@
v24.1.2.5-stable 2024-02-02
v24.1.1.2048-stable 2024-01-30 v24.1.1.2048-stable 2024-01-30
v23.12.2.59-stable 2024-01-05 v23.12.2.59-stable 2024-01-05
v23.12.1.1368-stable 2023-12-28 v23.12.1.1368-stable 2023-12-28

1 v24.1.1.2048-stable v24.1.2.5-stable 2024-01-30 2024-02-02
1 v24.1.2.5-stable 2024-02-02
2 v24.1.1.2048-stable v24.1.1.2048-stable 2024-01-30 2024-01-30
3 v23.12.2.59-stable v23.12.2.59-stable 2024-01-05 2024-01-05
4 v23.12.1.1368-stable v23.12.1.1368-stable 2023-12-28 2023-12-28