Merge remote-tracking branch 'upstream/master' into HEAD

This commit is contained in:
Anton Popov 2024-08-08 14:32:58 +00:00
commit 31b17d12fd
355 changed files with 9330 additions and 1957 deletions

View File

@ -67,7 +67,7 @@ jobs:
if: ${{ !cancelled() }}
run: |
export WORKFLOW_RESULT_FILE="/tmp/workflow_results.json"
cat >> "$WORKFLOW_RESULT_FILE" << 'EOF'
cat > "$WORKFLOW_RESULT_FILE" << 'EOF'
${{ toJson(needs) }}
EOF
python3 ./tests/ci/ci_buddy.py --check-wf-status

2
.gitmodules vendored
View File

@ -341,7 +341,7 @@
url = https://github.com/graphitemaster/incbin.git
[submodule "contrib/usearch"]
path = contrib/usearch
url = https://github.com/unum-cloud/usearch.git
url = https://github.com/ClickHouse/usearch.git
[submodule "contrib/SimSIMD"]
path = contrib/SimSIMD
url = https://github.com/ashvardanian/SimSIMD.git

View File

@ -21,6 +21,7 @@
#include "Poco/Exception.h"
#include "Poco/Foundation.h"
#include "Poco/Mutex.h"
#include "Poco/Message.h"
namespace Poco
@ -78,6 +79,10 @@ public:
///
/// The default implementation just breaks into the debugger.
virtual void logMessageImpl(Message::Priority priority, const std::string & msg) {}
/// Write a messages to the log
/// Useful for logging from Poco
static void handle(const Exception & exc);
/// Invokes the currently registered ErrorHandler.
@ -87,6 +92,9 @@ public:
static void handle();
/// Invokes the currently registered ErrorHandler.
static void logMessage(Message::Priority priority, const std::string & msg);
/// Invokes the currently registered ErrorHandler to log a message.
static ErrorHandler * set(ErrorHandler * pHandler);
/// Registers the given handler as the current error handler.
///

View File

@ -8,7 +8,7 @@
// Copyright (c) 2005-2006, Applied Informatics Software Engineering GmbH.
// and Contributors.
//
// SPDX-License-Identifier: BSL-1.0
// SPDX-License-Identifier: BSL-1.0
//
@ -35,79 +35,91 @@ ErrorHandler::~ErrorHandler()
void ErrorHandler::exception(const Exception& exc)
{
poco_debugger_msg(exc.what());
poco_debugger_msg(exc.what());
}
void ErrorHandler::exception(const std::exception& exc)
{
poco_debugger_msg(exc.what());
poco_debugger_msg(exc.what());
}
void ErrorHandler::exception()
{
poco_debugger_msg("unknown exception");
poco_debugger_msg("unknown exception");
}
void ErrorHandler::handle(const Exception& exc)
{
FastMutex::ScopedLock lock(_mutex);
try
{
_pHandler->exception(exc);
}
catch (...)
{
}
FastMutex::ScopedLock lock(_mutex);
try
{
_pHandler->exception(exc);
}
catch (...)
{
}
}
void ErrorHandler::handle(const std::exception& exc)
{
FastMutex::ScopedLock lock(_mutex);
try
{
_pHandler->exception(exc);
}
catch (...)
{
}
FastMutex::ScopedLock lock(_mutex);
try
{
_pHandler->exception(exc);
}
catch (...)
{
}
}
void ErrorHandler::handle()
{
FastMutex::ScopedLock lock(_mutex);
try
{
_pHandler->exception();
}
catch (...)
{
}
FastMutex::ScopedLock lock(_mutex);
try
{
_pHandler->exception();
}
catch (...)
{
}
}
void ErrorHandler::logMessage(Message::Priority priority, const std::string & msg)
{
FastMutex::ScopedLock lock(_mutex);
try
{
_pHandler->logMessageImpl(priority, msg);
}
catch (...)
{
}
}
ErrorHandler* ErrorHandler::set(ErrorHandler* pHandler)
{
poco_check_ptr(pHandler);
poco_check_ptr(pHandler);
FastMutex::ScopedLock lock(_mutex);
ErrorHandler* pOld = _pHandler;
_pHandler = pHandler;
return pOld;
FastMutex::ScopedLock lock(_mutex);
ErrorHandler* pOld = _pHandler;
_pHandler = pHandler;
return pOld;
}
ErrorHandler* ErrorHandler::defaultHandler()
{
// NOTE: Since this is called to initialize the static _pHandler
// variable, sh has to be a local static, otherwise we run
// into static initialization order issues.
static SingletonHolder<ErrorHandler> sh;
return sh.get();
// NOTE: Since this is called to initialize the static _pHandler
// variable, sh has to be a local static, otherwise we run
// into static initialization order issues.
static SingletonHolder<ErrorHandler> sh;
return sh.get();
}

View File

@ -17,6 +17,7 @@
#include "Poco/Net/StreamSocketImpl.h"
#include "Poco/NumberFormatter.h"
#include "Poco/Timestamp.h"
#include "Poco/ErrorHandler.h"
#include <string.h> // FD_SET needs memset on some platforms, so we can't use <cstring>

View File

@ -8,7 +8,7 @@
// Copyright (c) 2005-2006, Applied Informatics Software Engineering GmbH.
// and Contributors.
//
// SPDX-License-Identifier: BSL-1.0
// SPDX-License-Identifier: BSL-1.0
//
@ -44,190 +44,194 @@ TCPServerConnectionFilter::~TCPServerConnectionFilter()
TCPServer::TCPServer(TCPServerConnectionFactory::Ptr pFactory, Poco::UInt16 portNumber, TCPServerParams::Ptr pParams):
_socket(ServerSocket(portNumber)),
_thread(threadName(_socket)),
_stopped(true)
{
Poco::ThreadPool& pool = Poco::ThreadPool::defaultPool();
if (pParams)
{
int toAdd = pParams->getMaxThreads() - pool.capacity();
if (toAdd > 0) pool.addCapacity(toAdd);
}
_pDispatcher = new TCPServerDispatcher(pFactory, pool, pParams);
_socket(ServerSocket(portNumber)),
_thread(threadName(_socket)),
_stopped(true)
{
Poco::ThreadPool& pool = Poco::ThreadPool::defaultPool();
if (pParams)
{
int toAdd = pParams->getMaxThreads() - pool.capacity();
if (toAdd > 0) pool.addCapacity(toAdd);
}
_pDispatcher = new TCPServerDispatcher(pFactory, pool, pParams);
}
TCPServer::TCPServer(TCPServerConnectionFactory::Ptr pFactory, const ServerSocket& socket, TCPServerParams::Ptr pParams):
_socket(socket),
_thread(threadName(socket)),
_stopped(true)
_socket(socket),
_thread(threadName(socket)),
_stopped(true)
{
Poco::ThreadPool& pool = Poco::ThreadPool::defaultPool();
if (pParams)
{
int toAdd = pParams->getMaxThreads() - pool.capacity();
if (toAdd > 0) pool.addCapacity(toAdd);
}
_pDispatcher = new TCPServerDispatcher(pFactory, pool, pParams);
Poco::ThreadPool& pool = Poco::ThreadPool::defaultPool();
if (pParams)
{
int toAdd = pParams->getMaxThreads() - pool.capacity();
if (toAdd > 0) pool.addCapacity(toAdd);
}
_pDispatcher = new TCPServerDispatcher(pFactory, pool, pParams);
}
TCPServer::TCPServer(TCPServerConnectionFactory::Ptr pFactory, Poco::ThreadPool& threadPool, const ServerSocket& socket, TCPServerParams::Ptr pParams):
_socket(socket),
_pDispatcher(new TCPServerDispatcher(pFactory, threadPool, pParams)),
_thread(threadName(socket)),
_stopped(true)
_socket(socket),
_pDispatcher(new TCPServerDispatcher(pFactory, threadPool, pParams)),
_thread(threadName(socket)),
_stopped(true)
{
}
TCPServer::~TCPServer()
{
try
{
stop();
_pDispatcher->release();
}
catch (...)
{
poco_unexpected();
}
try
{
stop();
_pDispatcher->release();
}
catch (...)
{
poco_unexpected();
}
}
const TCPServerParams& TCPServer::params() const
{
return _pDispatcher->params();
return _pDispatcher->params();
}
void TCPServer::start()
{
poco_assert (_stopped);
poco_assert (_stopped);
_stopped = false;
_thread.start(*this);
_stopped = false;
_thread.start(*this);
}
void TCPServer::stop()
{
if (!_stopped)
{
_stopped = true;
_thread.join();
_pDispatcher->stop();
}
if (!_stopped)
{
_stopped = true;
_thread.join();
_pDispatcher->stop();
}
}
void TCPServer::run()
{
while (!_stopped)
{
Poco::Timespan timeout(250000);
try
{
if (_socket.poll(timeout, Socket::SELECT_READ))
{
try
{
StreamSocket ss = _socket.acceptConnection();
if (!_pConnectionFilter || _pConnectionFilter->accept(ss))
{
// enable nodelay per default: OSX really needs that
while (!_stopped)
{
Poco::Timespan timeout(250000);
try
{
if (_socket.poll(timeout, Socket::SELECT_READ))
{
try
{
StreamSocket ss = _socket.acceptConnection();
if (!_pConnectionFilter || _pConnectionFilter->accept(ss))
{
// enable nodelay per default: OSX really needs that
#if defined(POCO_OS_FAMILY_UNIX)
if (ss.address().family() != AddressFamily::UNIX_LOCAL)
if (ss.address().family() != AddressFamily::UNIX_LOCAL)
#endif
{
ss.setNoDelay(true);
}
_pDispatcher->enqueue(ss);
}
}
catch (Poco::Exception& exc)
{
ErrorHandler::handle(exc);
}
catch (std::exception& exc)
{
ErrorHandler::handle(exc);
}
catch (...)
{
ErrorHandler::handle();
}
}
}
catch (Poco::Exception& exc)
{
ErrorHandler::handle(exc);
// possibly a resource issue since poll() failed;
// give some time to recover before trying again
Poco::Thread::sleep(50);
}
}
{
ss.setNoDelay(true);
}
_pDispatcher->enqueue(ss);
}
else
{
ErrorHandler::logMessage(Message::PRIO_WARNING, "Filtered out connection from " + ss.peerAddress().toString());
}
}
catch (Poco::Exception& exc)
{
ErrorHandler::handle(exc);
}
catch (std::exception& exc)
{
ErrorHandler::handle(exc);
}
catch (...)
{
ErrorHandler::handle();
}
}
}
catch (Poco::Exception& exc)
{
ErrorHandler::handle(exc);
// possibly a resource issue since poll() failed;
// give some time to recover before trying again
Poco::Thread::sleep(50);
}
}
}
int TCPServer::currentThreads() const
{
return _pDispatcher->currentThreads();
return _pDispatcher->currentThreads();
}
int TCPServer::maxThreads() const
{
return _pDispatcher->maxThreads();
return _pDispatcher->maxThreads();
}
int TCPServer::totalConnections() const
{
return _pDispatcher->totalConnections();
return _pDispatcher->totalConnections();
}
int TCPServer::currentConnections() const
{
return _pDispatcher->currentConnections();
return _pDispatcher->currentConnections();
}
int TCPServer::maxConcurrentConnections() const
{
return _pDispatcher->maxConcurrentConnections();
return _pDispatcher->maxConcurrentConnections();
}
int TCPServer::queuedConnections() const
{
return _pDispatcher->queuedConnections();
return _pDispatcher->queuedConnections();
}
int TCPServer::refusedConnections() const
{
return _pDispatcher->refusedConnections();
return _pDispatcher->refusedConnections();
}
void TCPServer::setConnectionFilter(const TCPServerConnectionFilter::Ptr& pConnectionFilter)
{
poco_assert (_stopped);
poco_assert (_stopped);
_pConnectionFilter = pConnectionFilter;
_pConnectionFilter = pConnectionFilter;
}
std::string TCPServer::threadName(const ServerSocket& socket)
{
std::string name("TCPServer: ");
name.append(socket.address().toString());
return name;
std::string name("TCPServer: ");
name.append(socket.address().toString());
return name;
}

View File

@ -8,7 +8,7 @@
// Copyright (c) 2005-2007, Applied Informatics Software Engineering GmbH.
// and Contributors.
//
// SPDX-License-Identifier: BSL-1.0
// SPDX-License-Identifier: BSL-1.0
//
@ -33,44 +33,44 @@ namespace Net {
class TCPConnectionNotification: public Notification
{
public:
TCPConnectionNotification(const StreamSocket& socket):
_socket(socket)
{
}
~TCPConnectionNotification()
{
}
const StreamSocket& socket() const
{
return _socket;
}
TCPConnectionNotification(const StreamSocket& socket):
_socket(socket)
{
}
~TCPConnectionNotification()
{
}
const StreamSocket& socket() const
{
return _socket;
}
private:
StreamSocket _socket;
StreamSocket _socket;
};
TCPServerDispatcher::TCPServerDispatcher(TCPServerConnectionFactory::Ptr pFactory, Poco::ThreadPool& threadPool, TCPServerParams::Ptr pParams):
_rc(1),
_pParams(pParams),
_currentThreads(0),
_totalConnections(0),
_currentConnections(0),
_maxConcurrentConnections(0),
_refusedConnections(0),
_stopped(false),
_pConnectionFactory(pFactory),
_threadPool(threadPool)
_rc(1),
_pParams(pParams),
_currentThreads(0),
_totalConnections(0),
_currentConnections(0),
_maxConcurrentConnections(0),
_refusedConnections(0),
_stopped(false),
_pConnectionFactory(pFactory),
_threadPool(threadPool)
{
poco_check_ptr (pFactory);
poco_check_ptr (pFactory);
if (!_pParams)
_pParams = new TCPServerParams;
if (_pParams->getMaxThreads() == 0)
_pParams->setMaxThreads(threadPool.capacity());
if (!_pParams)
_pParams = new TCPServerParams;
if (_pParams->getMaxThreads() == 0)
_pParams->setMaxThreads(threadPool.capacity());
}
@ -81,161 +81,184 @@ TCPServerDispatcher::~TCPServerDispatcher()
void TCPServerDispatcher::duplicate()
{
++_rc;
++_rc;
}
void TCPServerDispatcher::release()
{
if (--_rc == 0) delete this;
if (--_rc == 0) delete this;
}
void TCPServerDispatcher::run()
{
AutoPtr<TCPServerDispatcher> guard(this); // ensure object stays alive
AutoPtr<TCPServerDispatcher> guard(this); // ensure object stays alive
int idleTime = (int) _pParams->getThreadIdleTime().totalMilliseconds();
int idleTime = (int) _pParams->getThreadIdleTime().totalMilliseconds();
for (;;)
{
try
{
AutoPtr<Notification> pNf = _queue.waitDequeueNotification(idleTime);
if (pNf && !_stopped)
{
TCPConnectionNotification* pCNf = dynamic_cast<TCPConnectionNotification*>(pNf.get());
if (pCNf)
{
beginConnection();
if (!_stopped)
{
std::unique_ptr<TCPServerConnection> pConnection(_pConnectionFactory->createConnection(pCNf->socket()));
poco_check_ptr(pConnection.get());
pConnection->start();
}
/// endConnection() should be called after destroying TCPServerConnection,
/// otherwise currentConnections() could become zero while some connections are yet still alive.
endConnection();
}
}
}
catch (Poco::Exception &exc) { ErrorHandler::handle(exc); }
catch (std::exception &exc) { ErrorHandler::handle(exc); }
catch (...) { ErrorHandler::handle(); }
FastMutex::ScopedLock lock(_mutex);
if (_stopped || (_currentThreads > 1 && _queue.empty()))
{
--_currentThreads;
break;
}
}
for (;;)
{
try
{
AutoPtr<Notification> pNf = _queue.waitDequeueNotification(idleTime);
if (pNf && !_stopped)
{
TCPConnectionNotification* pCNf = dynamic_cast<TCPConnectionNotification*>(pNf.get());
if (pCNf)
{
beginConnection();
if (!_stopped)
{
std::unique_ptr<TCPServerConnection> pConnection(_pConnectionFactory->createConnection(pCNf->socket()));
poco_check_ptr(pConnection.get());
pConnection->start();
}
/// endConnection() should be called after destroying TCPServerConnection,
/// otherwise currentConnections() could become zero while some connections are yet still alive.
endConnection();
}
}
}
catch (Poco::Exception &exc) { ErrorHandler::handle(exc); }
catch (std::exception &exc) { ErrorHandler::handle(exc); }
catch (...) { ErrorHandler::handle(); }
FastMutex::ScopedLock lock(_mutex);
if (_stopped || (_currentThreads > 1 && _queue.empty()))
{
--_currentThreads;
break;
}
}
}
namespace
{
static const std::string threadName("TCPServerConnection");
static const std::string threadName("TCPServerConnection");
}
void TCPServerDispatcher::enqueue(const StreamSocket& socket)
{
FastMutex::ScopedLock lock(_mutex);
FastMutex::ScopedLock lock(_mutex);
if (_queue.size() < _pParams->getMaxQueued())
{
if (!_queue.hasIdleThreads() && _currentThreads < _pParams->getMaxThreads())
{
try
{
ErrorHandler::logMessage(Message::PRIO_TEST, "Queue size: " + std::to_string(_queue.size()) +
", current threads: " + std::to_string(_currentThreads) +
", threads in pool: " + std::to_string(_threadPool.allocated()) +
", current connections: " + std::to_string(_currentConnections));
if (_queue.size() < _pParams->getMaxQueued())
{
/// NOTE: the condition below is wrong.
/// Since the thread pool is shared between multiple servers/TCPServerDispatchers,
/// _currentThreads < _pParams->getMaxThreads() will be true when the pool is actually saturated.
/// As a result, queue is useless and connections never wait in queue.
/// Instead, we (mistakenly) think that we can create a thread for this connection, but we fail to create it
/// and the connection get rejected.
/// We could check _currentThreads < _threadPool.allocated() to make it work,
/// but it's not clear if we want to make it work
/// because it may be better to reject connection immediately if we don't have resources to handle it.
if (!_queue.hasIdleThreads() && _currentThreads < _pParams->getMaxThreads())
{
try
{
this->duplicate();
_threadPool.startWithPriority(_pParams->getThreadPriority(), *this, threadName);
++_currentThreads;
}
catch (Poco::Exception& exc)
{
_threadPool.startWithPriority(_pParams->getThreadPriority(), *this, threadName);
++_currentThreads;
}
catch (Poco::Exception& exc)
{
ErrorHandler::logMessage(Message::PRIO_WARNING, "Got an exception while starting thread for connection from " +
socket.peerAddress().toString());
ErrorHandler::handle(exc);
this->release();
++_refusedConnections;
std::cerr << "Got exception while starting thread for connection. Error code: "
<< exc.code() << ", message: '" << exc.displayText() << "'" << std::endl;
return;
}
}
_queue.enqueueNotification(new TCPConnectionNotification(socket));
}
else
{
++_refusedConnections;
}
++_refusedConnections;
return;
}
}
else if (!_queue.hasIdleThreads())
{
ErrorHandler::logMessage(Message::PRIO_TRACE, "Don't have idle threads, adding connection from " +
socket.peerAddress().toString() + " to the queue, size: " + std::to_string(_queue.size()));
}
_queue.enqueueNotification(new TCPConnectionNotification(socket));
}
else
{
ErrorHandler::logMessage(Message::PRIO_WARNING, "Refusing connection from " + socket.peerAddress().toString() +
", reached max queue size " + std::to_string(_pParams->getMaxQueued()));
++_refusedConnections;
}
}
void TCPServerDispatcher::stop()
{
_stopped = true;
_queue.clear();
_queue.wakeUpAll();
_stopped = true;
_queue.clear();
_queue.wakeUpAll();
}
int TCPServerDispatcher::currentThreads() const
{
return _currentThreads;
return _currentThreads;
}
int TCPServerDispatcher::maxThreads() const
{
FastMutex::ScopedLock lock(_mutex);
return _threadPool.capacity();
FastMutex::ScopedLock lock(_mutex);
return _threadPool.capacity();
}
int TCPServerDispatcher::totalConnections() const
{
return _totalConnections;
return _totalConnections;
}
int TCPServerDispatcher::currentConnections() const
{
return _currentConnections;
return _currentConnections;
}
int TCPServerDispatcher::maxConcurrentConnections() const
{
return _maxConcurrentConnections;
return _maxConcurrentConnections;
}
int TCPServerDispatcher::queuedConnections() const
{
return _queue.size();
return _queue.size();
}
int TCPServerDispatcher::refusedConnections() const
{
return _refusedConnections;
return _refusedConnections;
}
void TCPServerDispatcher::beginConnection()
{
FastMutex::ScopedLock lock(_mutex);
FastMutex::ScopedLock lock(_mutex);
++_totalConnections;
++_currentConnections;
if (_currentConnections > _maxConcurrentConnections)
_maxConcurrentConnections.store(_currentConnections);
++_totalConnections;
++_currentConnections;
if (_currentConnections > _maxConcurrentConnections)
_maxConcurrentConnections.store(_currentConnections);
}
void TCPServerDispatcher::endConnection()
{
--_currentConnections;
--_currentConnections;
}

2
contrib/qpl vendored

@ -1 +1 @@
Subproject commit d4715e0e79896b85612158e135ee1a85f3b3e04d
Subproject commit c2ced94c53c1ee22191201a59878e9280bc9b9b8

View File

@ -4,7 +4,6 @@ set (QPL_PROJECT_DIR "${ClickHouse_SOURCE_DIR}/contrib/qpl")
set (QPL_SRC_DIR "${ClickHouse_SOURCE_DIR}/contrib/qpl/sources")
set (QPL_BINARY_DIR "${ClickHouse_BINARY_DIR}/build/contrib/qpl")
set (EFFICIENT_WAIT OFF)
set (BLOCK_ON_FAULT ON)
set (LOG_HW_INIT OFF)
set (SANITIZE_MEMORY OFF)
set (SANITIZE_THREADS OFF)
@ -16,16 +15,20 @@ function(GetLibraryVersion _content _outputVar)
SET(${_outputVar} ${CMAKE_MATCH_1} PARENT_SCOPE)
endfunction()
set (QPL_VERSION 1.2.0)
set (QPL_VERSION 1.6.0)
message(STATUS "Intel QPL version: ${QPL_VERSION}")
# There are 5 source subdirectories under $QPL_SRC_DIR: isal, c_api, core-sw, middle-layer, c_api.
# Generate 8 library targets: middle_layer_lib, isal, isal_asm, qplcore_px, qplcore_avx512, qplcore_sw_dispatcher, core_iaa, middle_layer_lib.
# There are 5 source subdirectories under $QPL_SRC_DIR: c_api, core-iaa, core-sw, middle-layer and isal.
# Generate 8 library targets: qpl_c_api, core_iaa, qplcore_px, qplcore_avx512, qplcore_sw_dispatcher, middle_layer_lib, isal and isal_asm,
# which are then combined into static or shared qpl.
# Output ch_contrib::qpl by linking with 8 library targets.
# The qpl submodule comes with its own version of isal. It contains code which does not exist in upstream isal. It would be nice to link
# only upstream isal (ch_contrib::isal) but at this point we can't.
# Note, QPL has integrated a customized version of ISA-L to meet specific needs.
# This version has been significantly modified and there are no plans to maintain compatibility with the upstream version
# or upgrade the current copy.
## cmake/CompileOptions.cmake and automatic wrappers generation
# ==========================================================================
# Copyright (C) 2022 Intel Corporation
@ -442,6 +445,7 @@ function(generate_unpack_kernel_arrays current_directory PLATFORMS_LIST)
endforeach()
endfunction()
# [SUBDIR]isal
enable_language(ASM_NASM)
@ -479,7 +483,6 @@ set(ISAL_ASM_SRC ${QPL_SRC_DIR}/isal/igzip/igzip_body.asm
${QPL_SRC_DIR}/isal/igzip/igzip_set_long_icf_fg_04.asm
${QPL_SRC_DIR}/isal/igzip/igzip_set_long_icf_fg_06.asm
${QPL_SRC_DIR}/isal/igzip/igzip_multibinary.asm
${QPL_SRC_DIR}/isal/igzip/stdmac.asm
${QPL_SRC_DIR}/isal/crc/crc_multibinary.asm
${QPL_SRC_DIR}/isal/crc/crc32_gzip_refl_by8.asm
${QPL_SRC_DIR}/isal/crc/crc32_gzip_refl_by8_02.asm
@ -505,7 +508,6 @@ set_property(GLOBAL APPEND PROPERTY QPL_LIB_DEPS
# Setting external and internal interfaces for ISA-L library
target_include_directories(isal
PUBLIC $<BUILD_INTERFACE:${QPL_SRC_DIR}/isal/include>
PRIVATE ${QPL_SRC_DIR}/isal/include
PUBLIC ${QPL_SRC_DIR}/isal/igzip)
set_target_properties(isal PROPERTIES
@ -617,12 +619,9 @@ target_compile_options(qplcore_sw_dispatcher
# [SUBDIR]core-iaa
file(GLOB HW_PATH_SRC ${QPL_SRC_DIR}/core-iaa/sources/aecs/*.c
${QPL_SRC_DIR}/core-iaa/sources/aecs/*.cpp
${QPL_SRC_DIR}/core-iaa/sources/driver_loader/*.c
${QPL_SRC_DIR}/core-iaa/sources/driver_loader/*.cpp
${QPL_SRC_DIR}/core-iaa/sources/descriptors/*.c
${QPL_SRC_DIR}/core-iaa/sources/descriptors/*.cpp
${QPL_SRC_DIR}/core-iaa/sources/bit_rev.c)
${QPL_SRC_DIR}/core-iaa/sources/*.c)
# Create library
add_library(core_iaa OBJECT ${HW_PATH_SRC})
@ -634,31 +633,27 @@ target_include_directories(core_iaa
PRIVATE ${UUID_DIR}
PUBLIC $<BUILD_INTERFACE:${QPL_SRC_DIR}/core-iaa/include>
PUBLIC $<BUILD_INTERFACE:${QPL_SRC_DIR}/core-iaa/sources/include>
PRIVATE $<BUILD_INTERFACE:${QPL_PROJECT_DIR}/include> # status.h in own_checkers.h
PRIVATE $<BUILD_INTERFACE:${QPL_PROJECT_DIR}/sources/c_api> # own_checkers.h
PRIVATE $<BUILD_INTERFACE:${QPL_PROJECT_DIR}/include> # status.h in own_checkers.h
PRIVATE $<TARGET_PROPERTY:qpl_c_api,INTERFACE_INCLUDE_DIRECTORIES> # for own_checkers.h
PRIVATE $<TARGET_PROPERTY:qplcore_sw_dispatcher,INTERFACE_INCLUDE_DIRECTORIES>)
target_compile_features(core_iaa PRIVATE c_std_11)
target_compile_definitions(core_iaa PRIVATE QPL_BADARG_CHECK
PRIVATE $<$<BOOL:${BLOCK_ON_FAULT}>: BLOCK_ON_FAULT_ENABLED>
PRIVATE $<$<BOOL:${LOG_HW_INIT}>:LOG_HW_INIT>
PRIVATE $<$<BOOL:${DYNAMIC_LOADING_LIBACCEL_CONFIG}>:DYNAMIC_LOADING_LIBACCEL_CONFIG>)
# [SUBDIR]middle-layer
file(GLOB MIDDLE_LAYER_SRC
${QPL_SRC_DIR}/middle-layer/analytics/*.cpp
${QPL_SRC_DIR}/middle-layer/c_wrapper/*.cpp
${QPL_SRC_DIR}/middle-layer/checksum/*.cpp
${QPL_SRC_DIR}/middle-layer/accelerator/*.cpp
${QPL_SRC_DIR}/middle-layer/analytics/*.cpp
${QPL_SRC_DIR}/middle-layer/common/*.cpp
${QPL_SRC_DIR}/middle-layer/compression/*.cpp
${QPL_SRC_DIR}/middle-layer/compression/*/*.cpp
${QPL_SRC_DIR}/middle-layer/compression/*/*/*.cpp
${QPL_SRC_DIR}/middle-layer/dispatcher/*.cpp
${QPL_SRC_DIR}/middle-layer/other/*.cpp
${QPL_SRC_DIR}/middle-layer/util/*.cpp
${QPL_SRC_DIR}/middle-layer/inflate/*.cpp
${QPL_SRC_DIR}/core-iaa/sources/accelerator/*.cpp) # todo
${QPL_SRC_DIR}/middle-layer/util/*.cpp)
add_library(middle_layer_lib OBJECT
${MIDDLE_LAYER_SRC})
@ -667,6 +662,7 @@ set_property(GLOBAL APPEND PROPERTY QPL_LIB_DEPS
$<TARGET_OBJECTS:middle_layer_lib>)
target_compile_options(middle_layer_lib
PRIVATE $<$<C_COMPILER_ID:GNU,Clang>:$<$<CONFIG:Release>:-O3;-U_FORTIFY_SOURCE;-D_FORTIFY_SOURCE=2>>
PRIVATE ${QPL_LINUX_TOOLCHAIN_CPP_EMBEDDED_FLAGS})
target_compile_definitions(middle_layer_lib
@ -682,6 +678,7 @@ target_include_directories(middle_layer_lib
PRIVATE ${UUID_DIR}
PUBLIC $<BUILD_INTERFACE:${QPL_SRC_DIR}/middle-layer>
PUBLIC $<TARGET_PROPERTY:_qpl,INTERFACE_INCLUDE_DIRECTORIES>
PRIVATE $<TARGET_PROPERTY:qpl_c_api,INTERFACE_INCLUDE_DIRECTORIES>
PUBLIC $<TARGET_PROPERTY:qplcore_sw_dispatcher,INTERFACE_INCLUDE_DIRECTORIES>
PUBLIC $<TARGET_PROPERTY:isal,INTERFACE_INCLUDE_DIRECTORIES>
PUBLIC $<TARGET_PROPERTY:core_iaa,INTERFACE_INCLUDE_DIRECTORIES>)
@ -689,31 +686,54 @@ target_include_directories(middle_layer_lib
target_compile_definitions(middle_layer_lib PUBLIC -DQPL_LIB)
# [SUBDIR]c_api
file(GLOB_RECURSE QPL_C_API_SRC
${QPL_SRC_DIR}/c_api/*.c
${QPL_SRC_DIR}/c_api/*.cpp)
file(GLOB QPL_C_API_SRC
${QPL_SRC_DIR}/c_api/compression_operations/*.c
${QPL_SRC_DIR}/c_api/compression_operations/*.cpp
${QPL_SRC_DIR}/c_api/filter_operations/*.cpp
${QPL_SRC_DIR}/c_api/legacy_hw_path/*.c
${QPL_SRC_DIR}/c_api/legacy_hw_path/*.cpp
${QPL_SRC_DIR}/c_api/other_operations/*.cpp
${QPL_SRC_DIR}/c_api/serialization/*.cpp
${QPL_SRC_DIR}/c_api/*.cpp)
add_library(qpl_c_api OBJECT ${QPL_C_API_SRC})
target_include_directories(qpl_c_api
PUBLIC $<BUILD_INTERFACE:${QPL_SRC_DIR}/c_api/>
PUBLIC $<BUILD_INTERFACE:${QPL_SRC_DIR}/include/> $<INSTALL_INTERFACE:include>
PRIVATE $<TARGET_PROPERTY:middle_layer_lib,INTERFACE_INCLUDE_DIRECTORIES>)
set_target_properties(qpl_c_api PROPERTIES
$<$<C_COMPILER_ID:GNU,Clang>:C_STANDARD 17
CXX_STANDARD 17)
target_compile_options(qpl_c_api
PRIVATE $<$<C_COMPILER_ID:GNU,Clang>:$<$<CONFIG:Release>:-O3;-U_FORTIFY_SOURCE;-D_FORTIFY_SOURCE=2>>
PRIVATE $<$<COMPILE_LANG_AND_ID:CXX,GNU,Clang>:${QPL_LINUX_TOOLCHAIN_CPP_EMBEDDED_FLAGS}>)
target_compile_definitions(qpl_c_api
PUBLIC -DQPL_BADARG_CHECK # own_checkers.h
PUBLIC -DQPL_LIB # needed for middle_layer_lib
PUBLIC $<$<BOOL:${LOG_HW_INIT}>:LOG_HW_INIT>) # needed for middle_layer_lib
set_property(GLOBAL APPEND PROPERTY QPL_LIB_DEPS
$<TARGET_OBJECTS:qpl_c_api>)
# Final _qpl target
get_property(LIB_DEPS GLOBAL PROPERTY QPL_LIB_DEPS)
add_library(_qpl STATIC ${QPL_C_API_SRC} ${LIB_DEPS})
add_library(_qpl STATIC ${LIB_DEPS})
target_include_directories(_qpl
PUBLIC $<BUILD_INTERFACE:${QPL_PROJECT_DIR}/include/> $<INSTALL_INTERFACE:include>
PRIVATE $<TARGET_PROPERTY:middle_layer_lib,INTERFACE_INCLUDE_DIRECTORIES>
PRIVATE $<BUILD_INTERFACE:${QPL_SRC_DIR}/c_api>)
PUBLIC $<BUILD_INTERFACE:${QPL_PROJECT_DIR}/include/> $<INSTALL_INTERFACE:include>)
target_compile_options(_qpl
PRIVATE ${QPL_LINUX_TOOLCHAIN_CPP_EMBEDDED_FLAGS})
target_compile_definitions(_qpl
PRIVATE -DQPL_LIB
PRIVATE -DQPL_BADARG_CHECK
PRIVATE $<$<BOOL:${DYNAMIC_LOADING_LIBACCEL_CONFIG}>:DYNAMIC_LOADING_LIBACCEL_CONFIG>
PUBLIC -DENABLE_QPL_COMPRESSION)
target_link_libraries(_qpl
PRIVATE ch_contrib::accel-config
PRIVATE ch_contrib::isal)
PRIVATE ch_contrib::accel-config)
target_include_directories(_qpl SYSTEM BEFORE
PUBLIC "${QPL_PROJECT_DIR}/include"

2
contrib/usearch vendored

@ -1 +1 @@
Subproject commit 955c6f9c11adfd89c912e0d1643d160b4e9e543f
Subproject commit 30810452bec5d3d3aa0931bb5d761e2f09aa6356

2
contrib/zlib-ng vendored

@ -1 +1 @@
Subproject commit 50f0eae1a411764cd6d1e85b3ce471438acd3c1c
Subproject commit a2fbeffdc30a8b0ce6d54ee31208e2688eac4c9f

View File

@ -14,6 +14,8 @@ add_definitions(-DHAVE_VISIBILITY_HIDDEN)
add_definitions(-DHAVE_VISIBILITY_INTERNAL)
add_definitions(-DHAVE_BUILTIN_CTZ)
add_definitions(-DHAVE_BUILTIN_CTZLL)
add_definitions(-DHAVE_ATTRIBUTE_ALIGNED)
add_definitions(-DHAVE_POSIX_MEMALIGN)
set(ZLIB_ARCH_SRCS)
set(ZLIB_ARCH_HDRS)
@ -24,67 +26,74 @@ if(ARCH_AARCH64)
set(ARCHDIR "${SOURCE_DIR}/arch/arm")
add_definitions(-DARM_FEATURES)
add_definitions(-DHAVE_SYS_AUXV_H)
add_definitions(-DARM_AUXV_HAS_CRC32 -DARM_ASM_HWCAP)
add_definitions(-DARM_AUXV_HAS_NEON)
add_definitions(-DARM_ACLE_CRC_HASH)
add_definitions(-DARM_NEON_ADLER32 -DARM_NEON_CHUNKSET -DARM_NEON_SLIDEHASH)
add_definitions(-DARM_ACLE)
add_definitions(-DHAVE_ARM_ACLE_H)
add_definitions(-DARM_NEON)
add_definitions(-DARM_NEON_HASLD4)
list(APPEND ZLIB_ARCH_HDRS ${ARCHDIR}/arm.h)
list(APPEND ZLIB_ARCH_SRCS ${ARCHDIR}/armfeature.c)
list(APPEND ZLIB_ARCH_HDRS ${ARCHDIR}/arm_features.h)
list(APPEND ZLIB_ARCH_SRCS ${ARCHDIR}/arm_features.c)
set(ACLE_SRCS ${ARCHDIR}/crc32_acle.c ${ARCHDIR}/insert_string_acle.c)
list(APPEND ZLIB_ARCH_SRCS ${ACLE_SRCS})
set(NEON_SRCS ${ARCHDIR}/adler32_neon.c ${ARCHDIR}/chunkset_neon.c ${ARCHDIR}/slide_neon.c)
set(NEON_SRCS ${ARCHDIR}/adler32_neon.c ${ARCHDIR}/chunkset_neon.c
${ARCHDIR}/compare256_neon.c ${ARCHDIR}/slide_hash_neon.c)
list(APPEND ZLIB_ARCH_SRCS ${NEON_SRCS})
elseif(ARCH_PPC64LE)
set(ARCHDIR "${SOURCE_DIR}/arch/power")
add_definitions(-DPOWER8)
add_definitions(-DPOWER_FEATURES)
add_definitions(-DPOWER8_VSX_ADLER32)
add_definitions(-DPOWER8_VSX_SLIDEHASH)
add_definitions(-DHAVE_SYS_AUXV_H)
list(APPEND ZLIB_ARCH_HDRS ${ARCHDIR}/power.h)
list(APPEND ZLIB_ARCH_SRCS ${ARCHDIR}/power.c)
set(POWER8_SRCS ${ARCHDIR}/adler32_power8.c ${ARCHDIR}/slide_hash_power8.c)
if(POWER9)
add_definitions(-DPOWER9)
else()
add_definitions(-DPOWER8)
add_definitions(-DPOWER8_VSX)
add_definitions(-DPOWER8_VSX_CRC32)
endif()
list(APPEND ZLIB_ARCH_HDRS ${ARCHDIR}/power_features.h)
list(APPEND ZLIB_ARCH_SRCS ${ARCHDIR}/power_features.c)
set(POWER8_SRCS ${ARCHDIR}/adler32_power8.c ${ARCHDIR}/chunkset_power8.c ${ARCHDIR}/slide_hash_power8.c)
list(APPEND POWER8_SRCS ${ARCHDIR}/crc32_power8.c)
list(APPEND ZLIB_ARCH_SRCS ${POWER8_SRCS})
elseif(ARCH_AMD64)
set(ARCHDIR "${SOURCE_DIR}/arch/x86")
add_definitions(-DX86_FEATURES)
list(APPEND ZLIB_ARCH_HDRS ${ARCHDIR}/x86.h)
list(APPEND ZLIB_ARCH_SRCS ${ARCHDIR}/x86.c)
list(APPEND ZLIB_ARCH_HDRS ${ARCHDIR}/x86_features.h)
list(APPEND ZLIB_ARCH_SRCS ${ARCHDIR}/x86_features.c)
if(ENABLE_AVX2)
add_definitions(-DX86_AVX2 -DX86_AVX2_ADLER32 -DX86_AVX_CHUNKSET)
set(AVX2_SRCS ${ARCHDIR}/slide_avx.c)
list(APPEND AVX2_SRCS ${ARCHDIR}/chunkset_avx.c)
list(APPEND AVX2_SRCS ${ARCHDIR}/compare258_avx.c)
list(APPEND AVX2_SRCS ${ARCHDIR}/adler32_avx.c)
add_definitions(-DX86_AVX2)
set(AVX2_SRCS ${ARCHDIR}/slide_hash_avx2.c)
list(APPEND AVX2_SRCS ${ARCHDIR}/chunkset_avx2.c)
list(APPEND AVX2_SRCS ${ARCHDIR}/compare256_avx2.c)
list(APPEND AVX2_SRCS ${ARCHDIR}/adler32_avx2.c)
list(APPEND ZLIB_ARCH_SRCS ${AVX2_SRCS})
endif()
if(ENABLE_SSE42)
add_definitions(-DX86_SSE42_CRC_HASH)
set(SSE42_SRCS ${ARCHDIR}/insert_string_sse.c)
list(APPEND ZLIB_ARCH_SRCS ${SSE42_SRCS})
add_definitions(-DX86_SSE42_CRC_INTRIN)
add_definitions(-DX86_SSE42_CMP_STR)
set(SSE42_SRCS ${ARCHDIR}/compare258_sse.c)
add_definitions(-DX86_SSE42)
set(SSE42_SRCS ${ARCHDIR}/adler32_sse42.c ${ARCHDIR}/insert_string_sse42.c)
list(APPEND ZLIB_ARCH_SRCS ${SSE42_SRCS})
endif()
if(ENABLE_SSSE3)
add_definitions(-DX86_SSSE3 -DX86_SSSE3_ADLER32)
set(SSSE3_SRCS ${ARCHDIR}/adler32_ssse3.c)
add_definitions(-DX86_SSSE3)
set(SSSE3_SRCS ${ARCHDIR}/adler32_ssse3.c ${ARCHDIR}/chunkset_ssse3.c)
list(APPEND ZLIB_ARCH_SRCS ${SSSE3_SRCS})
endif()
if(ENABLE_PCLMULQDQ)
add_definitions(-DX86_PCLMULQDQ_CRC)
set(PCLMULQDQ_SRCS ${ARCHDIR}/crc_folding.c)
set(PCLMULQDQ_SRCS ${ARCHDIR}/crc32_pclmulqdq.c)
list(APPEND ZLIB_ARCH_SRCS ${PCLMULQDQ_SRCS})
endif()
add_definitions(-DX86_SSE2 -DX86_SSE2_CHUNKSET -DX86_SSE2_SLIDEHASH)
set(SSE2_SRCS ${ARCHDIR}/chunkset_sse.c ${ARCHDIR}/slide_sse.c)
add_definitions(-DX86_SSE2)
set(SSE2_SRCS ${ARCHDIR}/chunkset_sse2.c ${ARCHDIR}/compare256_sse2.c ${ARCHDIR}/slide_hash_sse2.c)
list(APPEND ZLIB_ARCH_SRCS ${SSE2_SRCS})
add_definitions(-DX86_NOCHECK_SSE2)
endif ()
@ -106,39 +115,45 @@ generate_cmakein(${SOURCE_DIR}/zconf.h.in ${CMAKE_CURRENT_BINARY_DIR}/zconf.h.cm
set(ZLIB_SRCS
${SOURCE_DIR}/adler32.c
${SOURCE_DIR}/adler32_fold.c
${SOURCE_DIR}/chunkset.c
${SOURCE_DIR}/compare258.c
${SOURCE_DIR}/compare256.c
${SOURCE_DIR}/compress.c
${SOURCE_DIR}/crc32.c
${SOURCE_DIR}/crc32_comb.c
${SOURCE_DIR}/cpu_features.c
${SOURCE_DIR}/crc32_braid.c
${SOURCE_DIR}/crc32_braid_comb.c
${SOURCE_DIR}/crc32_fold.c
${SOURCE_DIR}/deflate.c
${SOURCE_DIR}/deflate_fast.c
${SOURCE_DIR}/deflate_huff.c
${SOURCE_DIR}/deflate_medium.c
${SOURCE_DIR}/deflate_quick.c
${SOURCE_DIR}/deflate_rle.c
${SOURCE_DIR}/deflate_slow.c
${SOURCE_DIR}/deflate_stored.c
${SOURCE_DIR}/functable.c
${SOURCE_DIR}/infback.c
${SOURCE_DIR}/inffast.c
${SOURCE_DIR}/inflate.c
${SOURCE_DIR}/inftrees.c
${SOURCE_DIR}/insert_string.c
${SOURCE_DIR}/insert_string_roll.c
${SOURCE_DIR}/slide_hash.c
${SOURCE_DIR}/trees.c
${SOURCE_DIR}/uncompr.c
${SOURCE_DIR}/zutil.c
)
set(ZLIB_GZFILE_SRCS
${SOURCE_DIR}/gzlib.c
${SOURCE_DIR}/gzread.c
${CMAKE_CURRENT_BINARY_DIR}/gzread.c
${SOURCE_DIR}/gzwrite.c
)
set(ZLIB_ALL_SRCS ${ZLIB_SRCS} ${ZLIB_ARCH_SRCS})
set(ZLIB_ALL_SRCS ${ZLIB_SRCS} ${ZLIB_ARCH_SRCS} ${ZLIB_GZFILE_SRCS})
add_library(_zlib ${ZLIB_ALL_SRCS})
add_library(ch_contrib::zlib ALIAS _zlib)
# https://github.com/zlib-ng/zlib-ng/pull/733
# This is disabed by default
add_compile_definitions(Z_TLS=__thread)
if(HAVE_UNISTD_H)
SET(ZCONF_UNISTD_LINE "#if 1 /* was set to #if 1 by configure/cmake/etc */")
else()
@ -153,6 +168,9 @@ endif()
set(ZLIB_PC ${CMAKE_CURRENT_BINARY_DIR}/zlib.pc)
configure_file(${SOURCE_DIR}/zlib.pc.cmakein ${ZLIB_PC} @ONLY)
configure_file(${CMAKE_CURRENT_BINARY_DIR}/zconf.h.cmakein ${CMAKE_CURRENT_BINARY_DIR}/zconf.h @ONLY)
configure_file(${SOURCE_DIR}/zlib.h.in ${CMAKE_CURRENT_BINARY_DIR}/zlib.h @ONLY)
configure_file(${SOURCE_DIR}/zlib_name_mangling.h.in ${CMAKE_CURRENT_BINARY_DIR}/zlib_name_mangling.h @ONLY)
configure_file(${SOURCE_DIR}/gzread.c.in ${CMAKE_CURRENT_BINARY_DIR}/gzread.c @ONLY)
# We should use same defines when including zlib.h as used when zlib compiled
target_compile_definitions (_zlib PUBLIC ZLIB_COMPAT WITH_GZFILEOP)

View File

@ -40,8 +40,6 @@ fi
DATA_DIR="${CLICKHOUSE_DATA_DIR:-/var/lib/clickhouse}"
LOG_DIR="${LOG_DIR:-/var/log/clickhouse-keeper}"
LOG_PATH="${LOG_DIR}/clickhouse-keeper.log"
ERROR_LOG_PATH="${LOG_DIR}/clickhouse-keeper.err.log"
COORDINATION_DIR="${DATA_DIR}/coordination"
COORDINATION_LOG_DIR="${DATA_DIR}/coordination/log"
COORDINATION_SNAPSHOT_DIR="${DATA_DIR}/coordination/snapshots"
@ -84,7 +82,7 @@ if [[ $# -lt 1 ]] || [[ "$1" == "--"* ]]; then
# There is a config file. It is already tested with gosu (if it is readably by keeper user)
if [ -f "$KEEPER_CONFIG" ]; then
exec $gosu /usr/bin/clickhouse-keeper --config-file="$KEEPER_CONFIG" --log-file="$LOG_PATH" --errorlog-file="$ERROR_LOG_PATH" "$@"
exec $gosu /usr/bin/clickhouse-keeper --config-file="$KEEPER_CONFIG" "$@"
fi
# There is no config file. Will use embedded one

View File

@ -28,12 +28,14 @@ RUN echo "TSAN_OPTIONS='verbosity=1000 halt_on_error=1 abort_on_error=1 history_
RUN echo "UBSAN_OPTIONS='print_stacktrace=1 max_allocation_size_mb=32768'" >> /etc/environment
RUN echo "MSAN_OPTIONS='abort_on_error=1 poison_in_dtor=1 max_allocation_size_mb=32768'" >> /etc/environment
RUN echo "LSAN_OPTIONS='suppressions=/usr/share/clickhouse-test/config/lsan_suppressions.txt max_allocation_size_mb=32768'" >> /etc/environment
RUN echo "ASAN_OPTIONS='halt_on_error=1 abort_on_error=1'" >> /etc/environment
# Sanitizer options for current shell (not current, but the one that will be spawned on "docker run")
# (but w/o verbosity for TSAN, otherwise test.reference will not match)
ENV TSAN_OPTIONS='halt_on_error=1 abort_on_error=1 history_size=7 memory_limit_mb=46080 second_deadlock_stack=1 max_allocation_size_mb=32768'
ENV UBSAN_OPTIONS='print_stacktrace=1 max_allocation_size_mb=32768'
ENV MSAN_OPTIONS='abort_on_error=1 poison_in_dtor=1 max_allocation_size_mb=32768'
ENV LSAN_OPTIONS='max_allocation_size_mb=32768'
ENV ASAN_OPTIONS='halt_on_error=1 abort_on_error=1'
# for external_symbolizer_path, and also ensure that llvm-symbolizer really
# exists (since you don't want to fallback to addr2line, it is very slow)

View File

@ -193,53 +193,60 @@ function fuzz
kill -0 $server_pid
# Set follow-fork-mode to parent, because we attach to clickhouse-server, not to watchdog
# and clickhouse-server can do fork-exec, for example, to run some bridge.
# Do not set nostop noprint for all signals, because some it may cause gdb to hang,
# explicitly ignore non-fatal signals that are used by server.
# Number of SIGRTMIN can be determined only in runtime.
RTMIN=$(kill -l SIGRTMIN)
echo "
set follow-fork-mode parent
handle SIGHUP nostop noprint pass
handle SIGINT nostop noprint pass
handle SIGQUIT nostop noprint pass
handle SIGPIPE nostop noprint pass
handle SIGTERM nostop noprint pass
handle SIGUSR1 nostop noprint pass
handle SIGUSR2 nostop noprint pass
handle SIG$RTMIN nostop noprint pass
info signals
continue
backtrace full
thread apply all backtrace full
info registers
disassemble /s
up
disassemble /s
up
disassemble /s
p \"done\"
detach
quit
" > script.gdb
IS_ASAN=$(clickhouse-client --query "SELECT count() FROM system.build_options WHERE name = 'CXX_FLAGS' AND position('sanitize=address' IN value)")
if [[ "$IS_ASAN" = "1" ]];
then
echo "ASAN build detected. Not using gdb since it disables LeakSanitizer detections"
else
# Set follow-fork-mode to parent, because we attach to clickhouse-server, not to watchdog
# and clickhouse-server can do fork-exec, for example, to run some bridge.
# Do not set nostop noprint for all signals, because some it may cause gdb to hang,
# explicitly ignore non-fatal signals that are used by server.
# Number of SIGRTMIN can be determined only in runtime.
RTMIN=$(kill -l SIGRTMIN)
echo "
set follow-fork-mode parent
handle SIGHUP nostop noprint pass
handle SIGINT nostop noprint pass
handle SIGQUIT nostop noprint pass
handle SIGPIPE nostop noprint pass
handle SIGTERM nostop noprint pass
handle SIGUSR1 nostop noprint pass
handle SIGUSR2 nostop noprint pass
handle SIG$RTMIN nostop noprint pass
info signals
continue
backtrace full
thread apply all backtrace full
info registers
disassemble /s
up
disassemble /s
up
disassemble /s
p \"done\"
detach
quit
" > script.gdb
gdb -batch -command script.gdb -p $server_pid &
sleep 5
# gdb will send SIGSTOP, spend some time loading debug info, and then send SIGCONT, wait for it (up to send_timeout, 300s)
time clickhouse-client --query "SELECT 'Connected to clickhouse-server after attaching gdb'" ||:
gdb -batch -command script.gdb -p $server_pid &
sleep 5
# gdb will send SIGSTOP, spend some time loading debug info, and then send SIGCONT, wait for it (up to send_timeout, 300s)
time clickhouse-client --query "SELECT 'Connected to clickhouse-server after attaching gdb'" ||:
# Check connectivity after we attach gdb, because it might cause the server
# to freeze, and the fuzzer will fail. In debug build, it can take a lot of time.
for _ in {1..180}
do
if clickhouse-client --query "select 1"
then
break
fi
sleep 1
done
kill -0 $server_pid # This checks that it is our server that is started and not some other one
fi
# Check connectivity after we attach gdb, because it might cause the server
# to freeze, and the fuzzer will fail. In debug build, it can take a lot of time.
for _ in {1..180}
do
if clickhouse-client --query "select 1"
then
break
fi
sleep 1
done
kill -0 $server_pid # This checks that it is our server that is started and not some other one
echo 'Server started and responded.'
setup_logs_replication
@ -264,8 +271,13 @@ quit
# The fuzzer_pid belongs to the timeout process.
actual_fuzzer_pid=$(ps -o pid= --ppid "$fuzzer_pid")
echo "Attaching gdb to the fuzzer itself"
gdb -batch -command script.gdb -p $actual_fuzzer_pid &
if [[ "$IS_ASAN" = "1" ]];
then
echo "ASAN build detected. Not using gdb since it disables LeakSanitizer detections"
else
echo "Attaching gdb to the fuzzer itself"
gdb -batch -command script.gdb -p $actual_fuzzer_pid &
fi
# Wait for the fuzzer to complete.
# Note that the 'wait || ...' thing is required so that the script doesn't

View File

@ -74,6 +74,7 @@ protobuf==4.25.2
psycopg2-binary==2.9.6
py4j==0.10.9.5
py==1.11.0
pyarrow==17.0.0
pycparser==2.22
pycryptodome==3.20.0
pymongo==3.11.0

View File

@ -5,47 +5,53 @@ source /utils.lib
function attach_gdb_to_clickhouse()
{
# Set follow-fork-mode to parent, because we attach to clickhouse-server, not to watchdog
# and clickhouse-server can do fork-exec, for example, to run some bridge.
# Do not set nostop noprint for all signals, because some it may cause gdb to hang,
# explicitly ignore non-fatal signals that are used by server.
# Number of SIGRTMIN can be determined only in runtime.
RTMIN=$(kill -l SIGRTMIN)
# shellcheck disable=SC2016
echo "
set follow-fork-mode parent
handle SIGHUP nostop noprint pass
handle SIGINT nostop noprint pass
handle SIGQUIT nostop noprint pass
handle SIGPIPE nostop noprint pass
handle SIGTERM nostop noprint pass
handle SIGUSR1 nostop noprint pass
handle SIGUSR2 nostop noprint pass
handle SIG$RTMIN nostop noprint pass
info signals
continue
backtrace full
info registers
p "top 1 KiB of the stack:"
p/x *(uint64_t[128]*)"'$sp'"
maintenance info sections
thread apply all backtrace full
disassemble /s
up
disassemble /s
up
disassemble /s
p \"done\"
detach
quit
" > script.gdb
IS_ASAN=$(clickhouse-client --query "SELECT count() FROM system.build_options WHERE name = 'CXX_FLAGS' AND position('sanitize=address' IN value)")
if [[ "$IS_ASAN" = "1" ]];
then
echo "ASAN build detected. Not using gdb since it disables LeakSanitizer detections"
else
# Set follow-fork-mode to parent, because we attach to clickhouse-server, not to watchdog
# and clickhouse-server can do fork-exec, for example, to run some bridge.
# Do not set nostop noprint for all signals, because some it may cause gdb to hang,
# explicitly ignore non-fatal signals that are used by server.
# Number of SIGRTMIN can be determined only in runtime.
RTMIN=$(kill -l SIGRTMIN)
# shellcheck disable=SC2016
echo "
set follow-fork-mode parent
handle SIGHUP nostop noprint pass
handle SIGINT nostop noprint pass
handle SIGQUIT nostop noprint pass
handle SIGPIPE nostop noprint pass
handle SIGTERM nostop noprint pass
handle SIGUSR1 nostop noprint pass
handle SIGUSR2 nostop noprint pass
handle SIG$RTMIN nostop noprint pass
info signals
continue
backtrace full
info registers
p "top 1 KiB of the stack:"
p/x *(uint64_t[128]*)"'$sp'"
maintenance info sections
thread apply all backtrace full
disassemble /s
up
disassemble /s
up
disassemble /s
p \"done\"
detach
quit
" > script.gdb
# FIXME Hung check may work incorrectly because of attached gdb
# We cannot attach another gdb to get stacktraces if some queries hung
gdb -batch -command script.gdb -p "$(cat /var/run/clickhouse-server/clickhouse-server.pid)" | ts '%Y-%m-%d %H:%M:%S' >> /test_output/gdb.log &
sleep 5
# gdb will send SIGSTOP, spend some time loading debug info and then send SIGCONT, wait for it (up to send_timeout, 300s)
run_with_retry 60 clickhouse-client --query "SELECT 'Connected to clickhouse-server after attaching gdb'"
# FIXME Hung check may work incorrectly because of attached gdb
# We cannot attach another gdb to get stacktraces if some queries hung
gdb -batch -command script.gdb -p "$(cat /var/run/clickhouse-server/clickhouse-server.pid)" | ts '%Y-%m-%d %H:%M:%S' >> /test_output/gdb.log &
sleep 5
# gdb will send SIGSTOP, spend some time loading debug info and then send SIGCONT, wait for it (up to send_timeout, 300s)
run_with_retry 60 clickhouse-client --query "SELECT 'Connected to clickhouse-server after attaching gdb'"
fi
}
# vi: ft=bash

View File

@ -174,7 +174,7 @@ do
done
setup_logs_replication
attach_gdb_to_clickhouse || true # FIXME: to not break old builds, clean on 2023-09-01
attach_gdb_to_clickhouse
function fn_exists() {
declare -F "$1" > /dev/null;

View File

@ -308,7 +308,8 @@ function collect_query_and_trace_logs()
{
for table in query_log trace_log metric_log
do
clickhouse-local --config-file=/etc/clickhouse-server/config.xml --only-system-tables -q "select * from system.$table format TSVWithNamesAndTypes" | zstd --threads=0 > /test_output/$table.tsv.zst ||:
# Don't ignore errors here, it leads to ignore sanitizer reports when running clickhouse-local
clickhouse-local --config-file=/etc/clickhouse-server/config.xml --only-system-tables -q "select * from system.$table format TSVWithNamesAndTypes" | zstd --threads=0 > /test_output/$table.tsv.zst
done
}

View File

@ -4,4 +4,5 @@ ARG FROM_TAG=latest
FROM clickhouse/test-base:$FROM_TAG
COPY run.sh /
CMD ["/bin/bash", "/run.sh"]
RUN chmod +x run.sh
ENTRYPOINT ["/run.sh"]

View File

@ -1,5 +1,27 @@
#!/bin/bash
set -x
# Need to keep error from tests after `tee`. Otherwise we don't alert on asan errors
set -o pipefail
set -e
timeout 40m gdb -q -ex 'set print inferior-events off' -ex 'set confirm off' -ex 'set print thread-events off' -ex run -ex bt -ex quit --args ./unit_tests_dbms --gtest_output='json:test_output/test_result.json' | tee test_output/test_result.txt
if [ "$#" -ne 1 ]; then
echo "Expected exactly one argument"
exit 1
fi
if [ "$1" = "GDB" ];
then
timeout 40m \
gdb -q -ex "set print inferior-events off" -ex "set confirm off" -ex "set print thread-events off" -ex run -ex bt -ex quit --args \
./unit_tests_dbms --gtest_output='json:test_output/test_result.json' \
| tee test_output/test_result.txt
elif [ "$1" = "NO_GDB" ];
then
timeout 40m \
./unit_tests_dbms --gtest_output='json:test_output/test_result.json' \
| tee test_output/test_result.txt
else
echo "Unknown argument: $1"
exit 1
fi

View File

@ -27,23 +27,23 @@ Avoid dumping copies of external code into the library directory.
Instead create a Git submodule to pull third-party code from an external upstream repository.
All submodules used by ClickHouse are listed in the `.gitmodule` file.
If the library can be used as-is (the default case), you can reference the upstream repository directly.
If the library needs patching, create a fork of the upstream repository in the [ClickHouse organization on GitHub](https://github.com/ClickHouse).
- If the library can be used as-is (the default case), you can reference the upstream repository directly.
- If the library needs patching, create a fork of the upstream repository in the [ClickHouse organization on GitHub](https://github.com/ClickHouse).
In the latter case, we aim to isolate custom patches as much as possible from upstream commits.
To that end, create a branch with prefix `clickhouse/` from the branch or tag you want to integrate, e.g. `clickhouse/master` (for branch `master`) or `clickhouse/release/vX.Y.Z` (for tag `release/vX.Y.Z`).
This ensures that pulls from the upstream repository into the fork will leave custom `clickhouse/` branches unaffected.
Submodules in `contrib/` must only track `clickhouse/` branches of forked third-party repositories.
To that end, create a branch with prefix `ClickHouse/` from the branch or tag you want to integrate, e.g. `ClickHouse/2024_2` (for branch `2024_2`) or `ClickHouse/release/vX.Y.Z` (for tag `release/vX.Y.Z`).
Avoid following upstream development branches `master`/ `main` / `dev` (i.e., prefix branches `ClickHouse/master` / `ClickHouse/main` / `ClickHouse/dev` in the fork repository).
Such branches are moving targets which make proper versioning harder.
"Prefix branches" ensure that pulls from the upstream repository into the fork will leave custom `ClickHouse/` branches unaffected.
Submodules in `contrib/` must only track `ClickHouse/` branches of forked third-party repositories.
Patches are only applied against `clickhouse/` branches of external libraries.
For that, push the patch as a branch with `clickhouse/`, e.g. `clickhouse/fix-some-desaster`.
Then create a PR from the new branch against the custom tracking branch with `clickhouse/` prefix, (e.g. `clickhouse/master` or `clickhouse/release/vX.Y.Z`) and merge the patch.
Patches are only applied against `ClickHouse/` branches of external libraries.
There are two ways to do that:
- you like to make a new fix against a `ClickHouse/`-prefix branch in the forked repository, e.g. a sanitizer fix. In that case, push the fix as a branch with `ClickHouse/` prefix, e.g. `ClickHouse/fix-sanitizer-disaster`. Then create a PR from the new branch against the custom tracking branch, e.g. `ClickHouse/2024_2 <-- ClickHouse/fix-sanitizer-disaster` and merge the PR.
- you update the submodule and need to re-apply earlier patches. In this case, re-creating old PRs is overkill. Instead, simply cherry-pick older commits into the new `ClickHouse/` branch (corresponding to the new version). Feel free to squash commits of PRs that had multiple commits. In the best case, we did contribute custom patches back to upstream and can omit patches in the new version.
Once the submodule has been updated, bump the submodule in ClickHouse to point to the new hash in the fork.
Create patches of third-party libraries with the official repository in mind and consider contributing the patch back to the upstream repository.
This makes sure that others will also benefit from the patch and it will not be a maintenance burden for the ClickHouse team.
To pull upstream changes into the submodule, you can use two methods:
- (less work but less clean): merge upstream `master` into the corresponding `clickhouse/` tracking branch in the forked repository. You will need to resolve merge conflicts with previous custom patches. This method can be used when the `clickhouse/` branch tracks an upstream development branch like `master`, `main`, `dev`, etc.
- (more work but cleaner): create a new branch with `clickhouse/` prefix from the upstream commit or tag you like to integrate. Then re-apply all existing patches using new PRs (or squash them into a single PR). This method can be used when the `clickhouse/` branch tracks a specific upstream version branch or tag. It is cleaner in the sense that custom patches and upstream changes are better isolated from each other.
Once the submodule has been updated, bump the submodule in ClickHouse to point to the new hash in the fork.

View File

@ -146,6 +146,7 @@ Code: 48. DB::Exception: Received from localhost:9000. DB::Exception: Reading fr
- `_file` — Name of the file. Type: `LowCardinalty(String)`.
- `_size` — Size of the file in bytes. Type: `Nullable(UInt64)`. If the size is unknown, the value is `NULL`.
- `_time` — Last modified time of the file. Type: `Nullable(DateTime)`. If the time is unknown, the value is `NULL`.
- `_etag` — ETag of the file. Type: `LowCardinalty(String)`. If the etag is unknown, the value is `NULL`.
For more information about virtual columns see [here](../../../engines/table-engines/index.md#table_engines-virtual_columns).

View File

@ -379,7 +379,7 @@ You can mitigate this problem by enabling `wait_end_of_query=1` ([Response Buffe
However, this does not completely solve the problem because the result must still fit within the `http_response_buffer_size`, and other settings like `send_progress_in_http_headers` can interfere with the delay of the header.
The only way to catch all errors is to analyze the HTTP body before parsing it using the required format.
### Queries with Parameters {#cli-queries-with-parameters}
## Queries with Parameters {#cli-queries-with-parameters}
You can create a query with parameters and pass values for them from the corresponding HTTP request parameters. For more information, see [Queries with Parameters for CLI](../interfaces/cli.md#cli-queries-with-parameters).

View File

@ -4629,8 +4629,8 @@ Default Value: 5.
## memory_overcommit_ratio_denominator {#memory_overcommit_ratio_denominator}
It represents soft memory limit in case when hard limit is reached on user level.
This value is used to compute overcommit ratio for the query.
It represents the soft memory limit when the hard limit is reached on the global level.
This value is used to compute the overcommit ratio for the query.
Zero means skip the query.
Read more about [memory overcommit](memory-overcommit.md).
@ -4646,8 +4646,8 @@ Default value: `5000000`.
## memory_overcommit_ratio_denominator_for_user {#memory_overcommit_ratio_denominator_for_user}
It represents soft memory limit in case when hard limit is reached on global level.
This value is used to compute overcommit ratio for the query.
It represents the soft memory limit when the hard limit is reached on the user level.
This value is used to compute the overcommit ratio for the query.
Zero means skip the query.
Read more about [memory overcommit](memory-overcommit.md).
@ -5609,6 +5609,18 @@ Minimal size of block to compress in CROSS JOIN. Zero value means - disable this
Default value: `1GiB`.
## restore_replace_external_engines_to_null
For testing purposes. Replaces all external engines to Null to not initiate external connections.
Default value: `False`
## restore_replace_external_table_functions_to_null
For testing purposes. Replaces all external table functions to Null to not initiate external connections.
Default value: `False`
## disable_insertion_and_mutation
Disable all insert and mutations (alter table update / alter table delete / alter table drop partition). Set to true, can make this node focus on reading queries.

View File

@ -3,7 +3,7 @@ slug: /en/operations/system-tables/trace_log
---
# trace_log
Contains stack traces collected by the sampling query profiler.
Contains stack traces collected by the [sampling query profiler](../../operations/optimizing-performance/sampling-query-profiler.md).
ClickHouse creates this table when the [trace_log](../../operations/server-configuration-parameters/settings.md#server_configuration_parameters-trace_log) server configuration section is set. Also see settings: [query_profiler_real_time_period_ns](../../operations/settings/settings.md#query_profiler_real_time_period_ns), [query_profiler_cpu_time_period_ns](../../operations/settings/settings.md#query_profiler_cpu_time_period_ns), [memory_profiler_step](../../operations/settings/settings.md#memory_profiler_step),
[memory_profiler_sample_probability](../../operations/settings/settings.md#memory_profiler_sample_probability), [trace_profile_events](../../operations/settings/settings.md#trace_profile_events).

View File

@ -28,39 +28,39 @@ A client application to interact with clickhouse-keeper by its native protocol.
Connected to ZooKeeper at [::1]:9181 with session_id 137
/ :) ls
keeper foo bar
/ :) cd keeper
/ :) cd 'keeper'
/keeper :) ls
api_version
/keeper :) cd api_version
/keeper :) cd 'api_version'
/keeper/api_version :) ls
/keeper/api_version :) cd xyz
/keeper/api_version :) cd 'xyz'
Path /keeper/api_version/xyz does not exist
/keeper/api_version :) cd ../../
/ :) ls
keeper foo bar
/ :) get keeper/api_version
/ :) get 'keeper/api_version'
2
```
## Commands {#clickhouse-keeper-client-commands}
- `ls [path]` -- Lists the nodes for the given path (default: cwd)
- `cd [path]` -- Changes the working path (default `.`)
- `exists <path>` -- Returns `1` if node exists, `0` otherwise
- `set <path> <value> [version]` -- Updates the node's value. Only updates if version matches (default: -1)
- `create <path> <value> [mode]` -- Creates new node with the set value
- `touch <path>` -- Creates new node with an empty string as value. Doesn't throw an exception if the node already exists
- `get <path>` -- Returns the node's value
- `rm <path> [version]` -- Removes the node only if version matches (default: -1)
- `rmr <path>` -- Recursively deletes path. Confirmation required
- `ls '[path]'` -- Lists the nodes for the given path (default: cwd)
- `cd '[path]'` -- Changes the working path (default `.`)
- `exists '<path>'` -- Returns `1` if node exists, `0` otherwise
- `set '<path>' <value> [version]` -- Updates the node's value. Only updates if version matches (default: -1)
- `create '<path>' <value> [mode]` -- Creates new node with the set value
- `touch '<path>'` -- Creates new node with an empty string as value. Doesn't throw an exception if the node already exists
- `get '<path>'` -- Returns the node's value
- `rm '<path>' [version]` -- Removes the node only if version matches (default: -1)
- `rmr '<path>'` -- Recursively deletes path. Confirmation required
- `flwc <command>` -- Executes four-letter-word command
- `help` -- Prints this message
- `get_direct_children_number [path]` -- Get numbers of direct children nodes under a specific path
- `get_all_children_number [path]` -- Get all numbers of children nodes under a specific path
- `get_stat [path]` -- Returns the node's stat (default `.`)
- `find_super_nodes <threshold> [path]` -- Finds nodes with number of children larger than some threshold for the given path (default `.`)
- `get_direct_children_number '[path]'` -- Get numbers of direct children nodes under a specific path
- `get_all_children_number '[path]'` -- Get all numbers of children nodes under a specific path
- `get_stat '[path]'` -- Returns the node's stat (default `.`)
- `find_super_nodes <threshold> '[path]'` -- Finds nodes with number of children larger than some threshold for the given path (default `.`)
- `delete_stale_backups` -- Deletes ClickHouse nodes used for backups that are now inactive
- `find_big_family [path] [n]` -- Returns the top n nodes with the biggest family in the subtree (default path = `.` and n = 10)
- `sync <path>` -- Synchronizes node between processes and leader
- `sync '<path>'` -- Synchronizes node between processes and leader
- `reconfig <add|remove|set> "<arg>" [version]` -- Reconfigure Keeper cluster. See https://clickhouse.com/docs/en/guides/sre/keeper/clickhouse-keeper#reconfiguration

View File

@ -10,7 +10,7 @@ Calculates a concatenated string from a group of strings, optionally separated b
**Syntax**
``` sql
groupConcat(expression [, delimiter] [, limit]);
groupConcat[(delimiter [, limit])](expression);
```
**Arguments**
@ -20,7 +20,7 @@ groupConcat(expression [, delimiter] [, limit]);
- `limit` — A positive [integer](../../../sql-reference/data-types/int-uint.md) specifying the maximum number of elements to concatenate. If more elements are present, excess elements are ignored. This parameter is optional.
:::note
If delimiter is specified without limit, it must be the first parameter following the expression. If both delimiter and limit are specified, delimiter must precede limit.
If delimiter is specified without limit, it must be the first parameter. If both delimiter and limit are specified, delimiter must precede limit.
:::
**Returned value**
@ -61,7 +61,7 @@ This concatenates all names into one continuous string without any separator.
Query:
``` sql
SELECT groupConcat(Name, ', ', 2) FROM Employees;
SELECT groupConcat(', ')(Name) FROM Employees;
```
Result:
@ -78,7 +78,7 @@ This output shows the names separated by a comma followed by a space.
Query:
``` sql
SELECT groupConcat(Name, ', ', 2) FROM Employees;
SELECT groupConcat(', ', 2)(Name) FROM Employees;
```
Result:

View File

@ -3045,13 +3045,425 @@ toUInt256OrDefault('abc', CAST('0', 'UInt256')): 0
- [`toUInt256OrZero`](#touint256orzero).
- [`toUInt256OrNull`](#touint256ornull).
## toFloat(32\|64)
## toFloat32
## toFloat(32\|64)OrZero
Converts an input value to a value of type [`Float32`](../data-types/float.md). Throws an exception in case of an error.
## toFloat(32\|64)OrNull
**Syntax**
## toFloat(32\|64)OrDefault
```sql
toFloat32(expr)
```
**Arguments**
- `expr` — Expression returning a number or a string representation of a number. [Expression](../syntax.md/#syntax-expressions).
Supported arguments:
- Values of type (U)Int8/16/32/64/128/256.
- String representations of (U)Int8/16/32/128/256.
- Values of type Float32/64, including `NaN` and `Inf`.
- String representations of Float32/64, including `NaN` and `Inf` (case-insensitive).
Unsupported arguments:
- String representations of binary and hexadecimal values, e.g. `SELECT toFloat32('0xc0fe');`.
**Returned value**
- 32-bit floating point value. [Float32](../data-types/float.md).
**Example**
Query:
```sql
SELECT
toFloat32(42.7),
toFloat32('42.7'),
toFloat32('NaN')
FORMAT Vertical;
```
Result:
```response
Row 1:
──────
toFloat32(42.7): 42.7
toFloat32('42.7'): 42.7
toFloat32('NaN'): nan
```
**See also**
- [`toFloat32OrZero`](#tofloat32orzero).
- [`toFloat32OrNull`](#tofloat32ornull).
- [`toFloat32OrDefault`](#tofloat32ordefault).
## toFloat32OrZero
Like [`toFloat32`](#tofloat32), this function converts an input value to a value of type [Float32](../data-types/float.md) but returns `0` in case of an error.
**Syntax**
```sql
toFloat32OrZero(x)
```
**Arguments**
- `x` — A String representation of a number. [String](../data-types/string.md).
Supported arguments:
- String representations of (U)Int8/16/32/128/256, Float32/64.
Unsupported arguments (return `0`):
- String representations of binary and hexadecimal values, e.g. `SELECT toFloat32OrZero('0xc0fe');`.
**Returned value**
- 32-bit Float value if successful, otherwise `0`. [Float32](../data-types/float.md).
**Example**
Query:
```sql
SELECT
toFloat32OrZero('42.7'),
toFloat32OrZero('abc')
FORMAT Vertical;
```
Result:
```response
Row 1:
──────
toFloat32OrZero('42.7'): 42.7
toFloat32OrZero('abc'): 0
```
**See also**
- [`toFloat32`](#tofloat32).
- [`toFloat32OrNull`](#tofloat32ornull).
- [`toFloat32OrDefault`](#tofloat32ordefault).
## toFloat32OrNull
Like [`toFloat32`](#tofloat32), this function converts an input value to a value of type [Float32](../data-types/float.md) but returns `NULL` in case of an error.
**Syntax**
```sql
toFloat32OrNull(x)
```
**Arguments**
- `x` — A String representation of a number. [String](../data-types/string.md).
Supported arguments:
- String representations of (U)Int8/16/32/128/256, Float32/64.
Unsupported arguments (return `\N`):
- String representations of binary and hexadecimal values, e.g. `SELECT toFloat32OrNull('0xc0fe');`.
**Returned value**
- 32-bit Float value if successful, otherwise `\N`. [Float32](../data-types/float.md).
**Example**
Query:
```sql
SELECT
toFloat32OrNull('42.7'),
toFloat32OrNull('abc')
FORMAT Vertical;
```
Result:
```response
Row 1:
──────
toFloat32OrNull('42.7'): 42.7
toFloat32OrNull('abc'): ᴺᵁᴸᴸ
```
**See also**
- [`toFloat32`](#tofloat32).
- [`toFloat32OrZero`](#tofloat32orzero).
- [`toFloat32OrDefault`](#tofloat32ordefault).
## toFloat32OrDefault
Like [`toFloat32`](#tofloat32), this function converts an input value to a value of type [Float32](../data-types/float.md) but returns the default value in case of an error.
If no `default` value is passed then `0` is returned in case of an error.
**Syntax**
```sql
toFloat32OrDefault(expr[, default])
```
**Arguments**
- `expr` — Expression returning a number or a string representation of a number. [Expression](../syntax.md/#syntax-expressions) / [String](../data-types/string.md).
- `default` (optional) — The default value to return if parsing to type `Float32` is unsuccessful. [Float32](../data-types/float.md).
Supported arguments:
- Values of type (U)Int8/16/32/64/128/256.
- String representations of (U)Int8/16/32/128/256.
- Values of type Float32/64, including `NaN` and `Inf`.
- String representations of Float32/64, including `NaN` and `Inf` (case-insensitive).
Arguments for which the default value is returned:
- String representations of binary and hexadecimal values, e.g. `SELECT toFloat32OrDefault('0xc0fe', CAST('0', 'Float32'));`.
**Returned value**
- 32-bit Float value if successful, otherwise returns the default value if passed or `0` if not. [Float32](../data-types/float.md).
**Example**
Query:
```sql
SELECT
toFloat32OrDefault('8', CAST('0', 'Float32')),
toFloat32OrDefault('abc', CAST('0', 'Float32'))
FORMAT Vertical;
```
Result:
```response
Row 1:
──────
toFloat32OrDefault('8', CAST('0', 'Float32')): 8
toFloat32OrDefault('abc', CAST('0', 'Float32')): 0
```
**See also**
- [`toFloat32`](#tofloat32).
- [`toFloat32OrZero`](#tofloat32orzero).
- [`toFloat32OrNull`](#tofloat32ornull).
## toFloat64
Converts an input value to a value of type [`Float64`](../data-types/float.md). Throws an exception in case of an error.
**Syntax**
```sql
toFloat64(expr)
```
**Arguments**
- `expr` — Expression returning a number or a string representation of a number. [Expression](../syntax.md/#syntax-expressions).
Supported arguments:
- Values of type (U)Int8/16/32/64/128/256.
- String representations of (U)Int8/16/32/128/256.
- Values of type Float32/64, including `NaN` and `Inf`.
- String representations of type Float32/64, including `NaN` and `Inf` (case-insensitive).
Unsupported arguments:
- String representations of binary and hexadecimal values, e.g. `SELECT toFloat64('0xc0fe');`.
**Returned value**
- 64-bit floating point value. [Float64](../data-types/float.md).
**Example**
Query:
```sql
SELECT
toFloat64(42.7),
toFloat64('42.7'),
toFloat64('NaN')
FORMAT Vertical;
```
Result:
```response
Row 1:
──────
toFloat64(42.7): 42.7
toFloat64('42.7'): 42.7
toFloat64('NaN'): nan
```
**See also**
- [`toFloat64OrZero`](#tofloat64orzero).
- [`toFloat64OrNull`](#tofloat64ornull).
- [`toFloat64OrDefault`](#tofloat64ordefault).
## toFloat64OrZero
Like [`toFloat64`](#tofloat64), this function converts an input value to a value of type [Float64](../data-types/float.md) but returns `0` in case of an error.
**Syntax**
```sql
toFloat64OrZero(x)
```
**Arguments**
- `x` — A String representation of a number. [String](../data-types/string.md).
Supported arguments:
- String representations of (U)Int8/16/32/128/256, Float32/64.
Unsupported arguments (return `0`):
- String representations of binary and hexadecimal values, e.g. `SELECT toFloat64OrZero('0xc0fe');`.
**Returned value**
- 64-bit Float value if successful, otherwise `0`. [Float64](../data-types/float.md).
**Example**
Query:
```sql
SELECT
toFloat64OrZero('42.7'),
toFloat64OrZero('abc')
FORMAT Vertical;
```
Result:
```response
Row 1:
──────
toFloat64OrZero('42.7'): 42.7
toFloat64OrZero('abc'): 0
```
**See also**
- [`toFloat64`](#tofloat64).
- [`toFloat64OrNull`](#tofloat64ornull).
- [`toFloat64OrDefault`](#tofloat64ordefault).
## toFloat64OrNull
Like [`toFloat64`](#tofloat64), this function converts an input value to a value of type [Float64](../data-types/float.md) but returns `NULL` in case of an error.
**Syntax**
```sql
toFloat64OrNull(x)
```
**Arguments**
- `x` — A String representation of a number. [String](../data-types/string.md).
Supported arguments:
- String representations of (U)Int8/16/32/128/256, Float32/64.
Unsupported arguments (return `\N`):
- String representations of binary and hexadecimal values, e.g. `SELECT toFloat64OrNull('0xc0fe');`.
**Returned value**
- 64-bit Float value if successful, otherwise `\N`. [Float64](../data-types/float.md).
**Example**
Query:
```sql
SELECT
toFloat64OrNull('42.7'),
toFloat64OrNull('abc')
FORMAT Vertical;
```
Result:
```response
Row 1:
──────
toFloat64OrNull('42.7'): 42.7
toFloat64OrNull('abc'): ᴺᵁᴸᴸ
```
**See also**
- [`toFloat64`](#tofloat64).
- [`toFloat64OrZero`](#tofloat64orzero).
- [`toFloat64OrDefault`](#tofloat64ordefault).
## toFloat64OrDefault
Like [`toFloat64`](#tofloat64), this function converts an input value to a value of type [Float64](../data-types/float.md) but returns the default value in case of an error.
If no `default` value is passed then `0` is returned in case of an error.
**Syntax**
```sql
toFloat64OrDefault(expr[, default])
```
**Arguments**
- `expr` — Expression returning a number or a string representation of a number. [Expression](../syntax.md/#syntax-expressions) / [String](../data-types/string.md).
- `default` (optional) — The default value to return if parsing to type `Float64` is unsuccessful. [Float64](../data-types/float.md).
Supported arguments:
- Values of type (U)Int8/16/32/64/128/256.
- String representations of (U)Int8/16/32/128/256.
- Values of type Float32/64, including `NaN` and `Inf`.
- String representations of Float32/64, including `NaN` and `Inf` (case-insensitive).
Arguments for which the default value is returned:
- String representations of binary and hexadecimal values, e.g. `SELECT toFloat64OrDefault('0xc0fe', CAST('0', 'Float64'));`.
**Returned value**
- 64-bit Float value if successful, otherwise returns the default value if passed or `0` if not. [Float64](../data-types/float.md).
**Example**
Query:
```sql
SELECT
toFloat64OrDefault('8', CAST('0', 'Float64')),
toFloat64OrDefault('abc', CAST('0', 'Float64'))
FORMAT Vertical;
```
Result:
```response
Row 1:
──────
toFloat64OrDefault('8', CAST('0', 'Float64')): 8
toFloat64OrDefault('abc', CAST('0', 'Float64')): 0
```
**See also**
- [`toFloat64`](#tofloat64).
- [`toFloat64OrZero`](#tofloat64orzero).
- [`toFloat64OrNull`](#tofloat64ornull).
## toDate

View File

@ -241,12 +241,12 @@ CREATE OR REPLACE TABLE test
(
id UInt64,
size_bytes Int64,
size String Alias formatReadableSize(size_bytes)
size String ALIAS formatReadableSize(size_bytes)
)
ENGINE = MergeTree
ORDER BY id;
INSERT INTO test Values (1, 4678899);
INSERT INTO test VALUES (1, 4678899);
SELECT id, size_bytes, size FROM test;
┌─id─┬─size_bytes─┬─size─────┐
@ -497,7 +497,7 @@ If you perform a SELECT query mentioning a specific value in an encrypted column
```sql
CREATE TABLE mytable
(
x String Codec(AES_128_GCM_SIV)
x String CODEC(AES_128_GCM_SIV)
)
ENGINE = MergeTree ORDER BY x;
```

View File

@ -36,9 +36,10 @@ If you anticipate frequent deletes, consider using a [custom partitioning key](/
## Limitations of lightweight `DELETE`
### Lightweight `DELETE`s do not work with projections
### Lightweight `DELETE`s with projections
Currently, `DELETE` does not work for tables with projections. This is because rows in a projection may be affected by a `DELETE` operation and may require the projection to be rebuilt, negatively affecting `DELETE` performance.
By default, `DELETE` does not work for tables with projections. This is because rows in a projection may be affected by a `DELETE` operation and may require the projection to be rebuilt, negatively affecting `DELETE` performance.
However, there is an option to change this behavior. By changing setting `lightweight_mutation_projection_mode = 'drop'`, deletes will work with projections.
## Performance considerations when using lightweight `DELETE`

View File

@ -44,6 +44,7 @@ LIMIT 2
Paths may use globbing. Files must match the whole path pattern, not only the suffix or prefix.
- `*` — Represents arbitrarily many characters except `/` but including the empty string.
- `**` — Represents all files inside a folder recursively.
- `?` — Represents an arbitrary single character.
- `{some_string,another_string,yet_another_one}` — Substitutes any of strings `'some_string', 'another_string', 'yet_another_one'`. The strings can contain the `/` symbol.
- `{N..M}` — Represents any number `>= N` and `<= M`.

View File

@ -1,7 +1,7 @@
---
slug: /en/sql-reference/window-functions/lagInFrame
sidebar_label: lagInFrame
sidebar_position: 8
sidebar_position: 9
---
# lagInFrame

View File

@ -1,7 +1,7 @@
---
slug: /en/sql-reference/window-functions/leadInFrame
sidebar_label: leadInFrame
sidebar_position: 9
sidebar_position: 10
---
# leadInFrame

View File

@ -0,0 +1,72 @@
---
slug: /en/sql-reference/window-functions/percent_rank
sidebar_label: percent_rank
sidebar_position: 8
---
# percent_rank
returns the relative rank (i.e. percentile) of rows within a window partition.
**Syntax**
Alias: `percentRank` (case-sensitive)
```sql
percent_rank (column_name)
OVER ([[PARTITION BY grouping_column] [ORDER BY sorting_column]
[RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]] | [window_name])
FROM table_name
WINDOW window_name as ([PARTITION BY grouping_column] [ORDER BY sorting_column] RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)
```
The default and required window frame definition is `RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING`.
For more detail on window function syntax see: [Window Functions - Syntax](./index.md/#syntax).
**Example**
Query:
```sql
CREATE TABLE salaries
(
`team` String,
`player` String,
`salary` UInt32,
`position` String
)
Engine = Memory;
INSERT INTO salaries FORMAT Values
('Port Elizabeth Barbarians', 'Gary Chen', 195000, 'F'),
('New Coreystad Archdukes', 'Charles Juarez', 190000, 'F'),
('Port Elizabeth Barbarians', 'Michael Stanley', 150000, 'D'),
('New Coreystad Archdukes', 'Scott Harrison', 150000, 'D'),
('Port Elizabeth Barbarians', 'Robert George', 195000, 'M'),
('South Hampton Seagulls', 'Douglas Benson', 150000, 'M'),
('South Hampton Seagulls', 'James Henderson', 140000, 'M');
```
```sql
SELECT player, salary,
percent_rank() OVER (ORDER BY salary DESC) AS percent_rank
FROM salaries;
```
Result:
```response
┌─player──────────┬─salary─┬───────percent_rank─┐
1. │ Gary Chen │ 195000 │ 0 │
2. │ Robert George │ 195000 │ 0 │
3. │ Charles Juarez │ 190000 │ 0.3333333333333333 │
4. │ Michael Stanley │ 150000 │ 0.5 │
5. │ Scott Harrison │ 150000 │ 0.5 │
6. │ Douglas Benson │ 150000 │ 0.5 │
7. │ James Henderson │ 140000 │ 1 │
└─────────────────┴────────┴────────────────────┘
```

View File

@ -81,6 +81,7 @@ SELECT * FROM file('test.csv', 'CSV', 'column1 UInt32, column2 UInt32, column3 U
Обрабатываться будут те и только те файлы, которые существуют в файловой системе и удовлетворяют всему шаблону пути.
- `*` — заменяет любое количество любых символов кроме `/`, включая отсутствие символов.
- `**` — Заменяет любое количество любых символов, включая `/`, то есть осуществляет рекурсивный поиск по вложенным директориям.
- `?` — заменяет ровно один любой символ.
- `{some_string,another_string,yet_another_one}` — заменяет любую из строк `'some_string', 'another_string', 'yet_another_one'`. Эти строки также могут содержать символ `/`.
- `{N..M}` — заменяет любое число в интервале от `N` до `M` включительно (может содержать ведущие нули).

View File

@ -47,6 +47,7 @@ LIMIT 2
- `*` — Заменяет любое количество любых символов (кроме `/`), включая отсутствие символов.
- `**` — Заменяет любое количество любых символов, включая `/`, то есть осуществляет рекурсивный поиск по вложенным директориям.
- `?` — Заменяет ровно один любой символ.
- `{some_string,another_string,yet_another_one}` — Заменяет любую из строк `'some_string', 'another_string', 'yet_another_one'`. Эти строки также могут содержать символ `/`.
- `{N..M}` — Заменяет любое число в интервале от `N` до `M` включительно (может содержать ведущие нули).

View File

@ -38,7 +38,7 @@
#include <Server/HTTP/HTTPServer.h>
#include <Server/HTTPHandlerFactory.h>
#include <Server/KeeperReadinessHandler.h>
#include <Server/PrometheusMetricsWriter.h>
#include <Server/PrometheusRequestHandlerFactory.h>
#include <Server/TCPServer.h>
#include "Core/Defines.h"
@ -421,7 +421,7 @@ try
std::lock_guard lock(servers_lock);
metrics.reserve(servers->size());
for (const auto & server : *servers)
metrics.emplace_back(ProtocolServerMetrics{server.getPortName(), server.currentThreads()});
metrics.emplace_back(ProtocolServerMetrics{server.getPortName(), server.currentThreads(), server.refusedConnections()});
return metrics;
}
);
@ -509,14 +509,13 @@ try
auto address = socketBindListen(socket, listen_host, port);
socket.setReceiveTimeout(my_http_context->getReceiveTimeout());
socket.setSendTimeout(my_http_context->getSendTimeout());
auto metrics_writer = std::make_shared<KeeperPrometheusMetricsWriter>(config, "prometheus", async_metrics);
servers->emplace_back(
listen_host,
port_name,
"Prometheus: http://" + address.toString(),
std::make_unique<HTTPServer>(
std::move(my_http_context),
createPrometheusMainHandlerFactory(*this, config_getter(), metrics_writer, "PrometheusHandler-factory"),
createKeeperPrometheusHandlerFactory(*this, config_getter(), async_metrics, "PrometheusHandler-factory"),
server_pool,
socket,
http_params));

View File

@ -918,10 +918,10 @@ try
metrics.reserve(servers_to_start_before_tables.size() + servers.size());
for (const auto & server : servers_to_start_before_tables)
metrics.emplace_back(ProtocolServerMetrics{server.getPortName(), server.currentThreads()});
metrics.emplace_back(ProtocolServerMetrics{server.getPortName(), server.currentThreads(), server.refusedConnections()});
for (const auto & server : servers)
metrics.emplace_back(ProtocolServerMetrics{server.getPortName(), server.currentThreads()});
metrics.emplace_back(ProtocolServerMetrics{server.getPortName(), server.currentThreads(), server.refusedConnections()});
return metrics;
}
);

View File

@ -0,0 +1,117 @@
#pragma once
#include <AggregateFunctions/IAggregateFunction.h>
#include <Interpreters/WindowDescription.h>
#include <Common/AlignedBuffer.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
class WindowTransform;
// Interface for true window functions. It's not much of an interface, they just
// accept the guts of WindowTransform and do 'something'. Given a small number of
// true window functions, and the fact that the WindowTransform internals are
// pretty much well-defined in domain terms (e.g. frame boundaries), this is
// somewhat acceptable.
class IWindowFunction
{
public:
virtual ~IWindowFunction() = default;
// Must insert the result for current_row.
virtual void windowInsertResultInto(const WindowTransform * transform, size_t function_index) const = 0;
virtual std::optional<WindowFrame> getDefaultFrame() const { return {}; }
virtual ColumnPtr castColumn(const Columns &, const std::vector<size_t> &) { return nullptr; }
/// Is the frame type supported by this function.
virtual bool checkWindowFrameType(const WindowTransform * /*transform*/) const { return true; }
};
// Runtime data for computing one window function.
struct WindowFunctionWorkspace
{
AggregateFunctionPtr aggregate_function;
// Cached value of aggregate function isState virtual method
bool is_aggregate_function_state = false;
// This field is set for pure window functions. When set, we ignore the
// window_function.aggregate_function, and work through this interface
// instead.
IWindowFunction * window_function_impl = nullptr;
std::vector<size_t> argument_column_indices;
// Will not be initialized for a pure window function.
mutable AlignedBuffer aggregate_function_state;
// Argument columns. Be careful, this is a per-block cache.
std::vector<const IColumn *> argument_columns;
UInt64 cached_block_number = std::numeric_limits<UInt64>::max();
};
// A basic implementation for a true window function. It pretends to be an
// aggregate function, but refuses to work as such.
struct WindowFunction : public IAggregateFunctionHelper<WindowFunction>, public IWindowFunction
{
std::string name;
WindowFunction(
const std::string & name_, const DataTypes & argument_types_, const Array & parameters_, const DataTypePtr & result_type_)
: IAggregateFunctionHelper<WindowFunction>(argument_types_, parameters_, result_type_), name(name_)
{
}
bool isOnlyWindowFunction() const override { return true; }
[[noreturn]] void fail() const
{
throw Exception(
ErrorCodes::BAD_ARGUMENTS, "The function '{}' can only be used as a window function, not as an aggregate function", getName());
}
String getName() const override { return name; }
void create(AggregateDataPtr __restrict) const override { }
void destroy(AggregateDataPtr __restrict) const noexcept override { }
bool hasTrivialDestructor() const override { return true; }
size_t sizeOfData() const override { return 0; }
size_t alignOfData() const override { return 1; }
void add(AggregateDataPtr __restrict, const IColumn **, size_t, Arena *) const override { fail(); }
void merge(AggregateDataPtr __restrict, ConstAggregateDataPtr, Arena *) const override { fail(); }
void serialize(ConstAggregateDataPtr __restrict, WriteBuffer &, std::optional<size_t>) const override { fail(); }
void deserialize(AggregateDataPtr __restrict, ReadBuffer &, std::optional<size_t>, Arena *) const override { fail(); }
void insertResultInto(AggregateDataPtr __restrict, IColumn &, Arena *) const override { fail(); }
};
template <typename State>
struct StatefulWindowFunction : public WindowFunction
{
StatefulWindowFunction(
const std::string & name_, const DataTypes & argument_types_, const Array & parameters_, const DataTypePtr & result_type_)
: WindowFunction(name_, argument_types_, parameters_, result_type_)
{
}
size_t sizeOfData() const override { return sizeof(State); }
size_t alignOfData() const override { return 1; }
void create(AggregateDataPtr __restrict place) const override { new (place) State(); }
void destroy(AggregateDataPtr __restrict place) const noexcept override { reinterpret_cast<State *>(place)->~State(); }
bool hasTrivialDestructor() const override { return std::is_trivially_destructible_v<State>; }
State & getState(const WindowFunctionWorkspace & workspace) const
{
return *reinterpret_cast<State *>(workspace.aggregate_function_state.data());
}
};
}

View File

@ -5,6 +5,7 @@
#include <Common/assert_cast.h>
#include <Common/FieldVisitorToString.h>
#include <Common/SipHash.h>
#include <DataTypes/DataTypeDateTime64.h>
#include <IO/WriteBuffer.h>
#include <IO/WriteHelpers.h>
@ -162,6 +163,7 @@ QueryTreeNodePtr ConstantNode::cloneImpl() const
ASTPtr ConstantNode::toASTImpl(const ConvertToASTOptions & options) const
{
const auto & constant_value_literal = constant_value->getValue();
const auto & constant_value_type = constant_value->getType();
auto constant_value_ast = std::make_shared<ASTLiteral>(constant_value_literal);
if (!options.add_cast_for_constants)
@ -169,7 +171,25 @@ ASTPtr ConstantNode::toASTImpl(const ConvertToASTOptions & options) const
if (requiresCastCall())
{
auto constant_type_name_ast = std::make_shared<ASTLiteral>(constant_value->getType()->getName());
/** Value for DateTime64 is Decimal64, which is serialized as a string literal.
* If we serialize it as is, DateTime64 would be parsed from that string literal, which can be incorrect.
* For example, DateTime64 cannot be parsed from the short value, like '1', while it's a valid Decimal64 value.
* It could also lead to ambiguous parsing because we don't know if the string literal represents a date or a Decimal64 literal.
* For this reason, we use a string literal representing a date instead of a Decimal64 literal.
*/
if (WhichDataType(constant_value_type->getTypeId()).isDateTime64())
{
const auto * date_time_type = typeid_cast<const DataTypeDateTime64 *>(constant_value_type.get());
DecimalField<Decimal64> decimal_value;
if (constant_value_literal.tryGet<DecimalField<Decimal64>>(decimal_value))
{
WriteBufferFromOwnString ostr;
writeDateTimeText(decimal_value.getValue(), date_time_type->getScale(), ostr, date_time_type->getTimeZone());
constant_value_ast = std::make_shared<ASTLiteral>(ostr.str());
}
}
auto constant_type_name_ast = std::make_shared<ASTLiteral>(constant_value_type->getName());
return makeASTFunction("_CAST", std::move(constant_value_ast), std::move(constant_type_name_ast));
}

View File

@ -46,7 +46,7 @@ public:
return;
const auto & storage_snapshot = table_node ? table_node->getStorageSnapshot() : table_function_node->getStorageSnapshot();
if (!storage->isVirtualColumn(column.name, storage_snapshot->getMetadataForQuery()))
if (!storage->isVirtualColumn(column.name, storage_snapshot->metadata))
return;
auto function_node = std::make_shared<FunctionNode>("shardNum");

View File

@ -78,6 +78,7 @@ add_headers_and_sources(clickhouse_common_io Common/Scheduler)
add_headers_and_sources(clickhouse_common_io Common/Scheduler/Nodes)
add_headers_and_sources(clickhouse_common_io IO)
add_headers_and_sources(clickhouse_common_io IO/Archives)
add_headers_and_sources(clickhouse_common_io IO/Protobuf)
add_headers_and_sources(clickhouse_common_io IO/S3)
add_headers_and_sources(clickhouse_common_io IO/AzureBlobStorage)
list (REMOVE_ITEM clickhouse_common_io_sources Common/malloc.cpp Common/new_delete.cpp)
@ -225,6 +226,7 @@ add_object_library(clickhouse_storages_liveview Storages/LiveView)
add_object_library(clickhouse_storages_windowview Storages/WindowView)
add_object_library(clickhouse_storages_s3queue Storages/ObjectStorageQueue)
add_object_library(clickhouse_storages_materializedview Storages/MaterializedView)
add_object_library(clickhouse_storages_time_series Storages/TimeSeries)
add_object_library(clickhouse_client Client)
# Always compile this file with the highest possible level of optimizations, even in Debug builds.
# https://github.com/ClickHouse/ClickHouse/issues/65745
@ -469,6 +471,7 @@ dbms_target_link_libraries (PUBLIC ch_contrib::sparsehash)
if (TARGET ch_contrib::protobuf)
dbms_target_link_libraries (PRIVATE ch_contrib::protobuf)
target_link_libraries (clickhouse_common_io PUBLIC ch_contrib::protobuf)
endif ()
if (TARGET clickhouse_grpc_protos)

View File

@ -1613,7 +1613,7 @@ void AsynchronousMetrics::update(TimePoint update_time, bool force_update)
#endif
{
auto get_metric_name_doc = [](const String & name) -> std::pair<const char *, const char *>
auto threads_get_metric_name_doc = [](const String & name) -> std::pair<const char *, const char *>
{
static std::map<String, std::pair<const char *, const char *>> metric_map =
{
@ -1637,11 +1637,38 @@ void AsynchronousMetrics::update(TimePoint update_time, bool force_update)
return it->second;
};
auto rejected_connections_get_metric_name_doc = [](const String & name) -> std::pair<const char *, const char *>
{
static std::map<String, std::pair<const char *, const char *>> metric_map =
{
{"tcp_port", {"TCPRejectedConnections", "Number of rejected connections for the TCP protocol (without TLS)."}},
{"tcp_port_secure", {"TCPSecureRejectedConnections", "Number of rejected connections for the TCP protocol (with TLS)."}},
{"http_port", {"HTTPRejectedConnections", "Number of rejected connections for the HTTP interface (without TLS)."}},
{"https_port", {"HTTPSecureRejectedConnections", "Number of rejected connections for the HTTPS interface."}},
{"interserver_http_port", {"InterserverRejectedConnections", "Number of rejected connections for the replicas communication protocol (without TLS)."}},
{"interserver_https_port", {"InterserverSecureRejectedConnections", "Number of rejected connections for the replicas communication protocol (with TLS)."}},
{"mysql_port", {"MySQLRejectedConnections", "Number of rejected connections for the MySQL compatibility protocol."}},
{"postgresql_port", {"PostgreSQLRejectedConnections", "Number of rejected connections for the PostgreSQL compatibility protocol."}},
{"grpc_port", {"GRPCRejectedConnections", "Number of rejected connections for the GRPC protocol."}},
{"prometheus.port", {"PrometheusRejectedConnections", "Number of rejected connections for the Prometheus endpoint. Note: prometheus endpoints can be also used via the usual HTTP/HTTPs ports."}},
{"keeper_server.tcp_port", {"KeeperTCPRejectedConnections", "Number of rejected connections for the Keeper TCP protocol (without TLS)."}},
{"keeper_server.tcp_port_secure", {"KeeperTCPSecureRejectedConnections", "Number of rejected connections for the Keeper TCP protocol (with TLS)."}}
};
auto it = metric_map.find(name);
if (it == metric_map.end())
return { nullptr, nullptr };
else
return it->second;
};
const auto server_metrics = protocol_server_metrics_func();
for (const auto & server_metric : server_metrics)
{
if (auto name_doc = get_metric_name_doc(server_metric.port_name); name_doc.first != nullptr)
if (auto name_doc = threads_get_metric_name_doc(server_metric.port_name); name_doc.first != nullptr)
new_values[name_doc.first] = { server_metric.current_threads, name_doc.second };
if (auto name_doc = rejected_connections_get_metric_name_doc(server_metric.port_name); name_doc.first != nullptr)
new_values[name_doc.first] = { server_metric.rejected_connections, name_doc.second };
}
}

View File

@ -42,6 +42,7 @@ struct ProtocolServerMetrics
{
String port_name;
size_t current_threads;
size_t rejected_connections;
};
/** Periodically (by default, each second)

View File

@ -243,7 +243,7 @@ public:
}
/// Clear and finish queue
void clearAndFinish()
void clearAndFinish() noexcept
{
{
std::lock_guard lock(queue_mutex);

View File

@ -604,6 +604,10 @@
M(723, PARQUET_EXCEPTION) \
M(724, TOO_MANY_TABLES) \
M(725, TOO_MANY_DATABASES) \
M(726, UNEXPECTED_HTTP_HEADERS) \
M(727, UNEXPECTED_TABLE_ENGINE) \
M(728, UNEXPECTED_DATA_TYPE) \
M(729, ILLEGAL_TIME_SERIES_TAGS) \
\
M(900, DISTRIBUTED_CACHE_ERROR) \
M(901, CANNOT_USE_DISTRIBUTED_CACHE) \

View File

@ -2,6 +2,7 @@
#include <Poco/ErrorHandler.h>
#include <Common/Exception.h>
#include <Common/logger_useful.h>
/** ErrorHandler for Poco::Thread,
@ -26,8 +27,32 @@ public:
void exception(const std::exception &) override { logException(); }
void exception() override { logException(); }
void logMessageImpl(Poco::Message::Priority priority, const std::string & msg) override
{
switch (priority)
{
case Poco::Message::PRIO_FATAL: [[fallthrough]];
case Poco::Message::PRIO_CRITICAL:
LOG_FATAL(trace_log, fmt::runtime(msg)); break;
case Poco::Message::PRIO_ERROR:
LOG_ERROR(trace_log, fmt::runtime(msg)); break;
case Poco::Message::PRIO_WARNING:
LOG_WARNING(trace_log, fmt::runtime(msg)); break;
case Poco::Message::PRIO_NOTICE: [[fallthrough]];
case Poco::Message::PRIO_INFORMATION:
LOG_INFO(trace_log, fmt::runtime(msg)); break;
case Poco::Message::PRIO_DEBUG:
LOG_DEBUG(trace_log, fmt::runtime(msg)); break;
case Poco::Message::PRIO_TRACE:
LOG_TRACE(trace_log, fmt::runtime(msg)); break;
case Poco::Message::PRIO_TEST:
LOG_TEST(trace_log, fmt::runtime(msg)); break;
}
}
private:
LoggerPtr log = getLogger("ServerErrorHandler");
LoggerPtr trace_log = getLogger("Poco");
void logException()
{

View File

@ -60,6 +60,7 @@ static struct InitFiu
ONCE(receive_timeout_on_table_status_response) \
REGULAR(keepermap_fail_drop_data) \
REGULAR(lazy_pipe_fds_fail_close) \
PAUSEABLE(infinite_sleep) \
namespace FailPoints

View File

@ -459,6 +459,7 @@ The server successfully detected this situation and will download merged part fr
M(AzureDeleteObjects, "Number of Azure blob storage API DeleteObject(s) calls.") \
M(AzureListObjects, "Number of Azure blob storage API ListObjects calls.") \
M(AzureGetProperties, "Number of Azure blob storage API GetProperties calls.") \
M(AzureCreateContainer, "Number of Azure blob storage API CreateContainer calls.") \
\
M(DiskAzureGetObject, "Number of Disk Azure API GetObject calls.") \
M(DiskAzureUpload, "Number of Disk Azure blob storage API Upload calls") \
@ -466,8 +467,9 @@ The server successfully detected this situation and will download merged part fr
M(DiskAzureCommitBlockList, "Number of Disk Azure blob storage API CommitBlockList calls") \
M(DiskAzureCopyObject, "Number of Disk Azure blob storage API CopyObject calls") \
M(DiskAzureListObjects, "Number of Disk Azure blob storage API ListObjects calls.") \
M(DiskAzureDeleteObjects, "Number of Azure blob storage API DeleteObject(s) calls.") \
M(DiskAzureDeleteObjects, "Number of Disk Azure blob storage API DeleteObject(s) calls.") \
M(DiskAzureGetProperties, "Number of Disk Azure blob storage API GetProperties calls.") \
M(DiskAzureCreateContainer, "Number of Disk Azure blob storage API CreateContainer calls.") \
\
M(ReadBufferFromAzureMicroseconds, "Time spent on reading from Azure.") \
M(ReadBufferFromAzureInitMicroseconds, "Time spent initializing connection to Azure.") \

View File

@ -8,7 +8,9 @@ using namespace DB;
using ResourceTest = ResourceTestClass;
TEST(SchedulerFairPolicy, Factory)
/// Tests disabled because of leaks in the test themselves: https://github.com/ClickHouse/ClickHouse/issues/67678
TEST(DISABLED_SchedulerFairPolicy, Factory)
{
ResourceTest t;
@ -17,7 +19,7 @@ TEST(SchedulerFairPolicy, Factory)
EXPECT_TRUE(dynamic_cast<FairPolicy *>(fair.get()) != nullptr);
}
TEST(SchedulerFairPolicy, FairnessWeights)
TEST(DISABLED_SchedulerFairPolicy, FairnessWeights)
{
ResourceTest t;
@ -41,7 +43,7 @@ TEST(SchedulerFairPolicy, FairnessWeights)
t.consumed("B", 20);
}
TEST(SchedulerFairPolicy, Activation)
TEST(DISABLED_SchedulerFairPolicy, Activation)
{
ResourceTest t;
@ -77,7 +79,7 @@ TEST(SchedulerFairPolicy, Activation)
t.consumed("B", 10);
}
TEST(SchedulerFairPolicy, FairnessMaxMin)
TEST(DISABLED_SchedulerFairPolicy, FairnessMaxMin)
{
ResourceTest t;
@ -101,7 +103,7 @@ TEST(SchedulerFairPolicy, FairnessMaxMin)
t.consumed("A", 20);
}
TEST(SchedulerFairPolicy, HierarchicalFairness)
TEST(DISABLED_SchedulerFairPolicy, HierarchicalFairness)
{
ResourceTest t;

View File

@ -8,7 +8,9 @@ using namespace DB;
using ResourceTest = ResourceTestClass;
TEST(SchedulerPriorityPolicy, Factory)
/// Tests disabled because of leaks in the test themselves: https://github.com/ClickHouse/ClickHouse/issues/67678
TEST(DISABLED_SchedulerPriorityPolicy, Factory)
{
ResourceTest t;
@ -17,7 +19,7 @@ TEST(SchedulerPriorityPolicy, Factory)
EXPECT_TRUE(dynamic_cast<PriorityPolicy *>(prio.get()) != nullptr);
}
TEST(SchedulerPriorityPolicy, Priorities)
TEST(DISABLED_SchedulerPriorityPolicy, Priorities)
{
ResourceTest t;
@ -51,7 +53,7 @@ TEST(SchedulerPriorityPolicy, Priorities)
t.consumed("C", 0);
}
TEST(SchedulerPriorityPolicy, Activation)
TEST(DISABLED_SchedulerPriorityPolicy, Activation)
{
ResourceTest t;
@ -92,7 +94,7 @@ TEST(SchedulerPriorityPolicy, Activation)
t.consumed("C", 0);
}
TEST(SchedulerPriorityPolicy, SinglePriority)
TEST(DISABLED_SchedulerPriorityPolicy, SinglePriority)
{
ResourceTest t;

View File

@ -10,7 +10,9 @@ using namespace DB;
using ResourceTest = ResourceTestClass;
TEST(SchedulerThrottlerConstraint, LeakyBucketConstraint)
/// Tests disabled because of leaks in the test themselves: https://github.com/ClickHouse/ClickHouse/issues/67678
TEST(DISABLED_SchedulerThrottlerConstraint, LeakyBucketConstraint)
{
ResourceTest t;
EventQueue::TimePoint start = std::chrono::system_clock::now();
@ -40,7 +42,7 @@ TEST(SchedulerThrottlerConstraint, LeakyBucketConstraint)
t.consumed("A", 10);
}
TEST(SchedulerThrottlerConstraint, Unlimited)
TEST(DISABLED_SchedulerThrottlerConstraint, Unlimited)
{
ResourceTest t;
EventQueue::TimePoint start = std::chrono::system_clock::now();
@ -57,7 +59,7 @@ TEST(SchedulerThrottlerConstraint, Unlimited)
}
}
TEST(SchedulerThrottlerConstraint, Pacing)
TEST(DISABLED_SchedulerThrottlerConstraint, Pacing)
{
ResourceTest t;
EventQueue::TimePoint start = std::chrono::system_clock::now();
@ -77,7 +79,7 @@ TEST(SchedulerThrottlerConstraint, Pacing)
}
}
TEST(SchedulerThrottlerConstraint, BucketFilling)
TEST(DISABLED_SchedulerThrottlerConstraint, BucketFilling)
{
ResourceTest t;
EventQueue::TimePoint start = std::chrono::system_clock::now();
@ -111,7 +113,7 @@ TEST(SchedulerThrottlerConstraint, BucketFilling)
t.consumed("A", 3);
}
TEST(SchedulerThrottlerConstraint, PeekAndAvgLimits)
TEST(DISABLED_SchedulerThrottlerConstraint, PeekAndAvgLimits)
{
ResourceTest t;
EventQueue::TimePoint start = std::chrono::system_clock::now();
@ -139,7 +141,7 @@ TEST(SchedulerThrottlerConstraint, PeekAndAvgLimits)
}
}
TEST(SchedulerThrottlerConstraint, ThrottlerAndFairness)
TEST(DISABLED_SchedulerThrottlerConstraint, ThrottlerAndFairness)
{
ResourceTest t;
EventQueue::TimePoint start = std::chrono::system_clock::now();

View File

@ -14,20 +14,21 @@
/// because of broken getauxval() [1].
///
/// [1]: https://github.com/ClickHouse/ClickHouse/pull/33957
TEST(Common, LSan)
TEST(SanitizerDeathTest, LSan)
{
int sanitizers_exit_code = 1;
ASSERT_EXIT({
std::thread leak_in_thread([]()
EXPECT_DEATH(
{
void * leak = malloc(4096);
ASSERT_NE(leak, nullptr);
});
leak_in_thread.join();
std::thread leak_in_thread(
[]()
{
void * leak = malloc(4096);
ASSERT_NE(leak, nullptr);
});
leak_in_thread.join();
__lsan_do_leak_check();
}, ::testing::ExitedWithCode(sanitizers_exit_code), ".*LeakSanitizer: detected memory leaks.*");
__lsan_do_leak_check();
},
".*LeakSanitizer: detected memory leaks.*");
}
#endif

View File

@ -5,19 +5,19 @@
# If you want really small size of the resulted binary, just link with fuzz_compression and clickhouse_common_io
clickhouse_add_executable (compressed_buffer_fuzzer compressed_buffer_fuzzer.cpp)
target_link_libraries (compressed_buffer_fuzzer PRIVATE dbms)
target_link_libraries (compressed_buffer_fuzzer PRIVATE dbms clickhouse_functions)
clickhouse_add_executable (lz4_decompress_fuzzer lz4_decompress_fuzzer.cpp)
target_link_libraries (lz4_decompress_fuzzer PUBLIC dbms ch_contrib::lz4)
target_link_libraries (lz4_decompress_fuzzer PUBLIC dbms ch_contrib::lz4 clickhouse_functions)
clickhouse_add_executable (delta_decompress_fuzzer delta_decompress_fuzzer.cpp)
target_link_libraries (delta_decompress_fuzzer PRIVATE dbms)
target_link_libraries (delta_decompress_fuzzer PRIVATE dbms clickhouse_functions)
clickhouse_add_executable (double_delta_decompress_fuzzer double_delta_decompress_fuzzer.cpp)
target_link_libraries (double_delta_decompress_fuzzer PRIVATE dbms)
target_link_libraries (double_delta_decompress_fuzzer PRIVATE dbms clickhouse_functions)
clickhouse_add_executable (encrypted_decompress_fuzzer encrypted_decompress_fuzzer.cpp)
target_link_libraries (encrypted_decompress_fuzzer PRIVATE dbms)
target_link_libraries (encrypted_decompress_fuzzer PRIVATE dbms clickhouse_functions)
clickhouse_add_executable (gcd_decompress_fuzzer gcd_decompress_fuzzer.cpp)
target_link_libraries (gcd_decompress_fuzzer PRIVATE dbms)
target_link_libraries (gcd_decompress_fuzzer PRIVATE dbms clickhouse_functions)

View File

@ -893,6 +893,8 @@ class IColumn;
M(Bool, optimize_distinct_in_order, true, "Enable DISTINCT optimization if some columns in DISTINCT form a prefix of sorting. For example, prefix of sorting key in merge tree or ORDER BY statement", 0) \
M(Bool, keeper_map_strict_mode, false, "Enforce additional checks during operations on KeeperMap. E.g. throw an exception on an insert for already existing key", 0) \
M(UInt64, extract_key_value_pairs_max_pairs_per_row, 1000, "Max number of pairs that can be produced by the `extractKeyValuePairs` function. Used as a safeguard against consuming too much memory.", 0) ALIAS(extract_kvp_max_pairs_per_row) \
M(Bool, restore_replace_external_engines_to_null, false, "Replace all the external table engines to Null on restore. Useful for testing purposes", 0) \
M(Bool, restore_replace_external_table_functions_to_null, false, "Replace all table functions to Null on restore. Useful for testing purposes", 0) \
\
\
/* ###################################### */ \
@ -903,6 +905,7 @@ class IColumn;
M(Bool, allow_experimental_nlp_functions, false, "Enable experimental functions for natural language processing.", 0) \
M(Bool, allow_experimental_hash_functions, false, "Enable experimental hash functions", 0) \
M(Bool, allow_experimental_object_type, false, "Allow Object and JSON data types", 0) \
M(Bool, allow_experimental_time_series_table, false, "Allows experimental TimeSeries table engine", 0) \
M(Bool, allow_experimental_variant_type, false, "Allow Variant data type", 0) \
M(Bool, allow_experimental_dynamic_type, false, "Allow Dynamic data type", 0) \
M(Bool, allow_experimental_annoy_index, false, "Allows to use Annoy index. Disabled by default because this feature is experimental", 0) \

View File

@ -75,9 +75,12 @@ static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory
},
{"24.8",
{
{"restore_replace_external_table_functions_to_null", false, false, "New setting."},
{"restore_replace_external_engines_to_null", false, false, "New setting."},
{"input_format_json_max_depth", 1000000, 1000, "It was unlimited in previous versions, but that was unsafe."},
{"merge_tree_min_bytes_per_task_for_remote_reading", 4194304, 2097152, "Value is unified with `filesystem_prefetch_min_bytes_for_single_read_task`"},
{"allow_archive_path_syntax", true, true, "Added new setting to allow disabling archive path syntax."},
{"allow_experimental_time_series_table", false, false, "Added new setting to allow the TimeSeries table engine"},
{"enable_analyzer", 1, 1, "Added an alias to a setting `allow_experimental_analyzer`."},
}
},

View File

@ -177,6 +177,11 @@ IMPLEMENT_SETTING_ENUM(LightweightMutationProjectionMode, ErrorCodes::BAD_ARGUME
{{"throw", LightweightMutationProjectionMode::THROW},
{"drop", LightweightMutationProjectionMode::DROP}})
IMPLEMENT_SETTING_ENUM(DeduplicateMergeProjectionMode, ErrorCodes::BAD_ARGUMENTS,
{{"throw", DeduplicateMergeProjectionMode::THROW},
{"drop", DeduplicateMergeProjectionMode::DROP},
{"rebuild", DeduplicateMergeProjectionMode::REBUILD}})
IMPLEMENT_SETTING_AUTO_ENUM(LocalFSReadMethod, ErrorCodes::BAD_ARGUMENTS)
IMPLEMENT_SETTING_ENUM(ParquetVersion, ErrorCodes::BAD_ARGUMENTS,

View File

@ -315,6 +315,15 @@ enum class LightweightMutationProjectionMode : uint8_t
DECLARE_SETTING_ENUM(LightweightMutationProjectionMode)
enum class DeduplicateMergeProjectionMode : uint8_t
{
THROW,
DROP,
REBUILD,
};
DECLARE_SETTING_ENUM(DeduplicateMergeProjectionMode)
DECLARE_SETTING_ENUM(LocalFSReadMethod)
enum class ObjectStorageQueueMode : uint8_t

View File

@ -33,6 +33,16 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
DataTypeAggregateFunction::DataTypeAggregateFunction(AggregateFunctionPtr function_, const DataTypes & argument_types_,
const Array & parameters_, std::optional<size_t> version_)
: function(std::move(function_))
, argument_types(argument_types_)
, parameters(parameters_)
, version(version_)
{
}
String DataTypeAggregateFunction::getFunctionName() const
{
return function->getName();

View File

@ -30,13 +30,7 @@ public:
static constexpr bool is_parametric = true;
DataTypeAggregateFunction(AggregateFunctionPtr function_, const DataTypes & argument_types_,
const Array & parameters_, std::optional<size_t> version_ = std::nullopt)
: function(std::move(function_))
, argument_types(argument_types_)
, parameters(parameters_)
, version(version_)
{
}
const Array & parameters_, std::optional<size_t> version_ = std::nullopt);
size_t getVersion() const;

View File

@ -90,7 +90,9 @@ void IDataType::forEachSubcolumn(
{
auto name = ISerialization::getSubcolumnNameForStream(subpath, prefix_len);
auto subdata = ISerialization::createFromPath(subpath, prefix_len);
callback(subpath, name, subdata);
auto path_copy = subpath;
path_copy.resize(prefix_len);
callback(path_copy, name, subdata);
}
subpath[i].visited = true;
}

View File

@ -8,6 +8,7 @@
#include <DataTypes/DataTypeNothing.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeMap.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeNested.h>
@ -66,6 +67,36 @@ DataTypePtr getBaseTypeOfArray(const DataTypePtr & type)
return last_array ? last_array->getNestedType() : type;
}
DataTypePtr getBaseTypeOfArray(DataTypePtr type, const Names & tuple_elements)
{
auto it = tuple_elements.begin();
while (true)
{
if (const auto * type_array = typeid_cast<const DataTypeArray *>(type.get()))
{
type = type_array->getNestedType();
}
else if (const auto * type_tuple = typeid_cast<const DataTypeTuple *>(type.get()))
{
if (it == tuple_elements.end())
break;
auto pos = type_tuple->tryGetPositionByName(*it);
if (!pos)
break;
++it;
type = type_tuple->getElement(*pos);
}
else
{
break;
}
}
return type;
}
ColumnPtr getBaseColumnOfArray(const ColumnPtr & column)
{
/// Get raw pointers to avoid extra copying of column pointers.

View File

@ -27,6 +27,9 @@ size_t getNumberOfDimensions(const IColumn & column);
/// Returns type of scalars of Array of arbitrary dimensions.
DataTypePtr getBaseTypeOfArray(const DataTypePtr & type);
/// The same as above but takes into account Tuples of Nested.
DataTypePtr getBaseTypeOfArray(DataTypePtr type, const Names & tuple_elements);
/// Returns Array type with requested scalar type and number of dimensions.
DataTypePtr createArrayOfType(DataTypePtr type, size_t num_dimensions);

View File

@ -195,7 +195,7 @@ public:
/// Types of substreams that can have arbitrary name.
static const std::set<Type> named_types;
Type type;
Type type = Type::Regular;
/// The name of a variant element type.
String variant_element_name;
@ -212,6 +212,7 @@ public:
/// Flag, that may help to traverse substream paths.
mutable bool visited = false;
Substream() = default;
Substream(Type type_) : type(type_) {} /// NOLINT
String toString() const;
};

View File

@ -11,6 +11,14 @@
#include <Poco/Util/AbstractConfiguration.h>
#include <Interpreters/Context.h>
namespace ProfileEvents
{
extern const Event AzureGetProperties;
extern const Event DiskAzureGetProperties;
extern const Event AzureCreateContainer;
extern const Event DiskAzureCreateContainer;
}
namespace DB
{
@ -214,20 +222,53 @@ void processURL(const String & url, const String & container_name, Endpoint & en
}
}
static bool containerExists(const ContainerClient & client)
{
ProfileEvents::increment(ProfileEvents::AzureGetProperties);
if (client.GetClickhouseOptions().IsClientForDisk)
ProfileEvents::increment(ProfileEvents::DiskAzureGetProperties);
try
{
client.GetProperties();
return true;
}
catch (const Azure::Storage::StorageException & e)
{
if (e.StatusCode == Azure::Core::Http::HttpStatusCode::NotFound)
return false;
throw;
}
}
std::unique_ptr<ContainerClient> getContainerClient(const ConnectionParams & params, bool readonly)
{
if (params.endpoint.container_already_exists.value_or(false) || readonly)
{
return params.createForContainer();
}
if (!params.endpoint.container_already_exists.has_value())
{
auto container_client = params.createForContainer();
if (containerExists(*container_client))
return container_client;
}
try
{
auto service_client = params.createForService();
ProfileEvents::increment(ProfileEvents::AzureCreateContainer);
if (params.client_options.ClickhouseOptions.IsClientForDisk)
ProfileEvents::increment(ProfileEvents::DiskAzureCreateContainer);
return std::make_unique<ContainerClient>(service_client->CreateBlobContainer(params.endpoint.container_name).Value);
}
catch (const Azure::Storage::StorageException & e)
{
/// If container_already_exists is not set (in config), ignore already exists error.
/// (Conflict - The specified container already exists)
/// If container_already_exists is not set (in config), ignore already exists error. Conflict - The specified container already exists.
/// To avoid race with creation of container, handle this error despite that we have already checked the existence of container.
if (!params.endpoint.container_already_exists.has_value() && e.StatusCode == Azure::Core::Http::HttpStatusCode::Conflict)
return params.createForContainer();
throw;

View File

@ -86,6 +86,7 @@ private:
Poco::Timestamp::fromEpochTime(
std::chrono::duration_cast<std::chrono::seconds>(
static_cast<std::chrono::system_clock::time_point>(blob.Details.LastModified).time_since_epoch()).count()),
blob.Details.ETag.ToString(),
{}}));
}
@ -186,6 +187,7 @@ void AzureObjectStorage::listObjects(const std::string & path, RelativePathsWith
Poco::Timestamp::fromEpochTime(
std::chrono::duration_cast<std::chrono::seconds>(
static_cast<std::chrono::system_clock::time_point>(blob.Details.LastModified).time_since_epoch()).count()),
blob.Details.ETag.ToString(),
{}}));
}

View File

@ -205,7 +205,7 @@ void DiskObjectStorageMetadata::addObject(ObjectStorageKey key, size_t size)
}
total_size += size;
keys_with_meta.emplace_back(std::move(key), ObjectMetadata{size, {}, {}});
keys_with_meta.emplace_back(std::move(key), ObjectMetadata{size, {}, {}, {}});
}
ObjectKeyWithMetadata DiskObjectStorageMetadata::popLastObject()

View File

@ -222,6 +222,7 @@ void HDFSObjectStorage::listObjects(const std::string & path, RelativePathsWithM
ObjectMetadata{
static_cast<uint64_t>(ls.file_info[i].mSize),
Poco::Timestamp::fromEpochTime(ls.file_info[i].mLastMod),
"",
{}}));
}

View File

@ -54,6 +54,7 @@ struct ObjectMetadata
{
uint64_t size_bytes = 0;
Poco::Timestamp last_modified;
std::string etag;
ObjectAttributes attributes;
};

View File

@ -146,7 +146,7 @@ private:
auto objects = outcome.GetResult().GetContents();
for (const auto & object : objects)
{
ObjectMetadata metadata{static_cast<uint64_t>(object.GetSize()), Poco::Timestamp::fromEpochTime(object.GetLastModified().Seconds()), {}};
ObjectMetadata metadata{static_cast<uint64_t>(object.GetSize()), Poco::Timestamp::fromEpochTime(object.GetLastModified().Seconds()), object.GetETag(), {}};
batch.emplace_back(std::make_shared<RelativePathWithMetadata>(object.GetKey(), std::move(metadata)));
}
@ -332,6 +332,7 @@ void S3ObjectStorage::listObjects(const std::string & path, RelativePathsWithMet
ObjectMetadata{
static_cast<uint64_t>(object.GetSize()),
Poco::Timestamp::fromEpochTime(object.GetLastModified().Seconds()),
object.GetETag(),
{}}));
if (max_keys)
@ -479,6 +480,7 @@ ObjectMetadata S3ObjectStorage::getObjectMetadata(const std::string & path) cons
ObjectMetadata result;
result.size_bytes = object_info.size;
result.last_modified = Poco::Timestamp::fromEpochTime(object_info.last_modification_time);
result.etag = object_info.etag;
result.attributes = object_info.metadata;
return result;

View File

@ -7,6 +7,7 @@
#include <Common/FieldVisitorConvertToNumber.h>
#include <Common/ProfileEvents.h>
#include <Common/assert_cast.h>
#include <Common/FailPoint.h>
#include <Core/Settings.h>
#include <base/sleep.h>
#include <IO/WriteHelpers.h>
@ -32,6 +33,11 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
}
namespace FailPoints
{
extern const char infinite_sleep[];
}
/** sleep(seconds) - the specified number of seconds sleeps each columns.
*/
@ -107,6 +113,8 @@ public:
{
/// When sleeping, the query cannot be cancelled. For ability to cancel query, we limit sleep time.
UInt64 microseconds = static_cast<UInt64>(seconds * 1e6);
FailPointInjection::pauseFailPoint(FailPoints::infinite_sleep);
if (max_microseconds && microseconds > max_microseconds)
throw Exception(ErrorCodes::TOO_SLOW, "The maximum sleep time is {} microseconds. Requested: {} microseconds",
max_microseconds, microseconds);

View File

@ -0,0 +1,56 @@
#include "config.h"
#if USE_PROTOBUF
#include <IO/Protobuf/ProtobufZeroCopyInputStreamFromReadBuffer.h>
#include <IO/ReadBuffer.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
ProtobufZeroCopyInputStreamFromReadBuffer::ProtobufZeroCopyInputStreamFromReadBuffer(std::unique_ptr<ReadBuffer> in_) : in(std::move(in_))
{
}
ProtobufZeroCopyInputStreamFromReadBuffer::~ProtobufZeroCopyInputStreamFromReadBuffer() = default;
bool ProtobufZeroCopyInputStreamFromReadBuffer::Next(const void ** data, int * size)
{
if (in->eof())
return false;
*data = in->position();
*size = static_cast<int>(in->available());
in->position() += *size;
return true;
}
void ProtobufZeroCopyInputStreamFromReadBuffer::BackUp(int count)
{
if (static_cast<Int64>(in->offset()) < count)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"ProtobufZeroCopyInputStreamFromReadBuffer::BackUp() cannot back up {} bytes (max = {} bytes)",
count,
in->offset());
in->position() -= count;
}
bool ProtobufZeroCopyInputStreamFromReadBuffer::Skip(int count)
{
return static_cast<Int64>(in->tryIgnore(count)) == count;
}
int64_t ProtobufZeroCopyInputStreamFromReadBuffer::ByteCount() const
{
return in->count();
}
}
#endif

View File

@ -0,0 +1,38 @@
#pragma once
#include "config.h"
#if USE_PROTOBUF
#include <google/protobuf/io/zero_copy_stream.h>
namespace DB
{
class ReadBuffer;
class ProtobufZeroCopyInputStreamFromReadBuffer : public google::protobuf::io::ZeroCopyInputStream
{
public:
explicit ProtobufZeroCopyInputStreamFromReadBuffer(std::unique_ptr<ReadBuffer> in_);
~ProtobufZeroCopyInputStreamFromReadBuffer() override;
// Obtains a chunk of data from the stream.
bool Next(const void ** data, int * size) override;
// Backs up a number of bytes, so that the next call to Next() returns
// data again that was already returned by the last call to Next().
void BackUp(int count) override;
// Skips a number of bytes.
bool Skip(int count) override;
// Returns the total number of bytes read since this object was created.
int64_t ByteCount() const override;
private:
std::unique_ptr<ReadBuffer> in;
};
}
#endif

View File

@ -0,0 +1,60 @@
#include "config.h"
#if USE_PROTOBUF
#include <IO/Protobuf/ProtobufZeroCopyOutputStreamFromWriteBuffer.h>
#include <IO/WriteBuffer.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
ProtobufZeroCopyOutputStreamFromWriteBuffer::ProtobufZeroCopyOutputStreamFromWriteBuffer(WriteBuffer & out_) : out(&out_)
{
}
ProtobufZeroCopyOutputStreamFromWriteBuffer::ProtobufZeroCopyOutputStreamFromWriteBuffer(std::unique_ptr<WriteBuffer> out_)
: ProtobufZeroCopyOutputStreamFromWriteBuffer(*out_)
{
out_holder = std::move(out_);
}
ProtobufZeroCopyOutputStreamFromWriteBuffer::~ProtobufZeroCopyOutputStreamFromWriteBuffer() = default;
bool ProtobufZeroCopyOutputStreamFromWriteBuffer::Next(void ** data, int * size)
{
*data = out->position();
*size = static_cast<int>(out->available());
out->position() += *size;
return true;
}
void ProtobufZeroCopyOutputStreamFromWriteBuffer::BackUp(int count)
{
if (static_cast<Int64>(out->offset()) < count)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"ProtobufZeroCopyOutputStreamFromWriteBuffer::BackUp() cannot back up {} bytes (max = {} bytes)",
count,
out->offset());
out->position() -= count;
}
int64_t ProtobufZeroCopyOutputStreamFromWriteBuffer::ByteCount() const
{
return out->count();
}
void ProtobufZeroCopyOutputStreamFromWriteBuffer::finalize()
{
out->finalize();
}
}
#endif

View File

@ -0,0 +1,40 @@
#pragma once
#include "config.h"
#if USE_PROTOBUF
#include <google/protobuf/io/zero_copy_stream.h>
namespace DB
{
class WriteBuffer;
class ProtobufZeroCopyOutputStreamFromWriteBuffer : public google::protobuf::io::ZeroCopyOutputStream
{
public:
explicit ProtobufZeroCopyOutputStreamFromWriteBuffer(WriteBuffer & out_);
explicit ProtobufZeroCopyOutputStreamFromWriteBuffer(std::unique_ptr<WriteBuffer> out_);
~ProtobufZeroCopyOutputStreamFromWriteBuffer() override;
// Obtains a buffer into which data can be written.
bool Next(void ** data, int * size) override;
// Backs up a number of bytes, so that the end of the last buffer returned
// by Next() is not actually written.
void BackUp(int count) override;
// Returns the total number of bytes written since this object was created.
int64_t ByteCount() const override;
void finalize();
private:
WriteBuffer * out;
std::unique_ptr<WriteBuffer> out_holder;
};
}
#endif

View File

@ -54,6 +54,7 @@ namespace
ObjectInfo object_info;
object_info.size = static_cast<size_t>(result.GetContentLength());
object_info.last_modification_time = result.GetLastModified().Seconds();
object_info.etag = result.GetETag();
if (with_metadata)
object_info.metadata = result.GetMetadata();

View File

@ -15,6 +15,7 @@ struct ObjectInfo
{
size_t size = 0;
time_t last_modification_time = 0;
String etag;
std::map<String, String> metadata = {}; /// Set only if getObjectInfo() is called with `with_metadata = true`.
};

View File

@ -16,7 +16,13 @@ namespace ErrorCodes
}
SnappyWriteBuffer::SnappyWriteBuffer(std::unique_ptr<WriteBuffer> out_, size_t buf_size, char * existing_memory, size_t alignment)
: BufferWithOwnMemory<WriteBuffer>(buf_size, existing_memory, alignment), out(std::move(out_))
: SnappyWriteBuffer(*out_, buf_size, existing_memory, alignment)
{
out_holder = std::move(out_);
}
SnappyWriteBuffer::SnappyWriteBuffer(WriteBuffer & out_, size_t buf_size, char * existing_memory, size_t alignment)
: BufferWithOwnMemory<WriteBuffer>(buf_size, existing_memory, alignment), out(&out_)
{
}

View File

@ -18,6 +18,12 @@ public:
char * existing_memory = nullptr,
size_t alignment = 0);
explicit SnappyWriteBuffer(
WriteBuffer & out_,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
char * existing_memory = nullptr,
size_t alignment = 0);
~SnappyWriteBuffer() override;
void finalizeImpl() override { finish(); }
@ -28,7 +34,9 @@ private:
void finishImpl();
void finish();
std::unique_ptr<WriteBuffer> out;
WriteBuffer * out;
std::unique_ptr<WriteBuffer> out_holder;
bool finished = false;
String uncompress_buffer;

View File

@ -1,6 +1,7 @@
#pragma once
#include <Common/typeid_cast.h>
#include <Parsers/ASTWithElement.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTQueryWithTableAndOutput.h>
#include <Parsers/ASTRenameQuery.h>
@ -100,6 +101,7 @@ private:
const String database_name;
std::set<String> external_tables;
mutable std::unordered_set<String> with_aliases;
bool only_replace_current_database_function = false;
bool only_replace_in_join = false;
@ -117,6 +119,10 @@ private:
void visit(ASTSelectQuery & select, ASTPtr &) const
{
if (select.recursive_with)
for (const auto & child : select.with()->children)
with_aliases.insert(child->as<ASTWithElement>()->name);
if (select.tables())
tryVisit<ASTTablesInSelectQuery>(select.refTables());
@ -165,6 +171,9 @@ private:
/// There is temporary table with such name, should not be rewritten.
if (external_tables.contains(identifier.shortName()))
return;
/// This is WITH RECURSIVE alias.
if (with_aliases.contains(identifier.name()))
return;
auto qualified_identifier = std::make_shared<ASTTableIdentifier>(database_name, identifier.name());
if (!identifier.alias.empty())

View File

@ -3307,6 +3307,17 @@ void NO_INLINE Aggregator::destroyImpl(Table & table) const
data = nullptr;
});
if constexpr (Method::low_cardinality_optimization || Method::one_key_nullable_optimization)
{
if (table.getNullKeyData() != nullptr)
{
for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_functions[i]->destroy(table.getNullKeyData() + offsets_of_aggregate_states[i]);
table.getNullKeyData() = nullptr;
}
}
}

View File

@ -254,6 +254,8 @@ String toString(ClientInfo::Interface interface)
return "LOCAL";
case ClientInfo::Interface::TCP_INTERSERVER:
return "TCP_INTERSERVER";
case ClientInfo::Interface::PROMETHEUS:
return "PROMETHEUS";
}
return std::format("Unknown server interface ({}).", static_cast<int>(interface));

View File

@ -38,6 +38,7 @@ public:
POSTGRESQL = 5,
LOCAL = 6,
TCP_INTERSERVER = 7,
PROMETHEUS = 8,
};
enum class HTTPMethod : uint8_t

View File

@ -38,6 +38,7 @@
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <AggregateFunctions/parseAggregateFunctionParameters.h>
#include <AggregateFunctions/WindowFunction.h>
#include <Storages/StorageDistributed.h>
#include <Storages/StorageDictionary.h>
@ -590,6 +591,7 @@ void ExpressionAnalyzer::makeAggregateDescriptions(ActionsDAG & actions, Aggrega
void ExpressionAnalyzer::makeWindowDescriptionFromAST(const Context & context_,
const WindowDescriptions & existing_descriptions,
AggregateFunctionPtr aggregate_function,
WindowDescription & desc, const IAST * ast)
{
const auto & definition = ast->as<const ASTWindowDefinition &>();
@ -698,7 +700,21 @@ void ExpressionAnalyzer::makeWindowDescriptionFromAST(const Context & context_,
ast->formatForErrorMessage());
}
const auto * window_function = aggregate_function ? dynamic_cast<const IWindowFunction *>(aggregate_function.get()) : nullptr;
desc.frame.is_default = definition.frame_is_default;
if (desc.frame.is_default && window_function)
{
auto default_window_frame_opt = window_function->getDefaultFrame();
if (default_window_frame_opt)
{
desc.frame = *default_window_frame_opt;
/// Append the default frame description to window_name, make sure it will be put into
/// a proper window description.
desc.window_name += " " + desc.frame.toString();
return;
}
}
desc.frame.type = definition.frame_type;
desc.frame.begin_type = definition.frame_begin_type;
desc.frame.begin_preceding = definition.frame_begin_preceding;
@ -734,7 +750,7 @@ void ExpressionAnalyzer::makeWindowDescriptions(ActionsDAG & actions)
WindowDescription desc;
desc.window_name = elem.name;
makeWindowDescriptionFromAST(*current_context, window_descriptions,
desc, elem.definition.get());
nullptr, desc, elem.definition.get());
auto [it, inserted] = window_descriptions.insert(
{elem.name, std::move(desc)});
@ -821,12 +837,12 @@ void ExpressionAnalyzer::makeWindowDescriptions(ActionsDAG & actions)
WindowDescription desc;
desc.window_name = default_window_name;
makeWindowDescriptionFromAST(*current_context, window_descriptions,
desc, &definition);
window_function.aggregate_function, desc, &definition);
auto full_sort_description = desc.full_sort_description;
auto [it, inserted] = window_descriptions.insert(
{default_window_name, std::move(desc)});
{desc.window_name, std::move(desc)});
if (!inserted)
{

View File

@ -135,7 +135,12 @@ public:
/// A list of windows for window functions.
const WindowDescriptions & windowDescriptions() const { return window_descriptions; }
void makeWindowDescriptionFromAST(const Context & context, const WindowDescriptions & existing_descriptions, WindowDescription & desc, const IAST * ast);
void makeWindowDescriptionFromAST(
const Context & context,
const WindowDescriptions & existing_descriptions,
AggregateFunctionPtr aggregate_function,
WindowDescription & desc,
const IAST * ast);
void makeWindowDescriptions(ActionsDAG & actions);
/** Checks if subquery is not a plain StorageSet.

View File

@ -648,10 +648,8 @@ bool HashJoin::addBlockToJoin(const Block & source_block_, bool check_limits)
return table_join->sizeLimits().check(total_rows, total_bytes, "JOIN", ErrorCodes::SET_SIZE_LIMIT_EXCEEDED);
}
void HashJoin::shrinkStoredBlocksToFit(size_t & total_bytes_in_join)
void HashJoin::shrinkStoredBlocksToFit(size_t & total_bytes_in_join, bool force_optimize)
{
if (shrink_blocks)
return; /// Already shrunk
Int64 current_memory_usage = getCurrentQueryMemoryUsage();
Int64 query_memory_usage_delta = current_memory_usage - memory_usage_before_adding_blocks;
@ -659,15 +657,21 @@ void HashJoin::shrinkStoredBlocksToFit(size_t & total_bytes_in_join)
auto max_total_bytes_in_join = table_join->sizeLimits().max_bytes;
/** If accounted data size is more than half of `max_bytes_in_join`
* or query memory consumption growth from the beginning of adding blocks (estimation of memory consumed by join using memory tracker)
* is bigger than half of all memory available for query,
* then shrink stored blocks to fit.
*/
shrink_blocks = (max_total_bytes_in_join && total_bytes_in_join > max_total_bytes_in_join / 2) ||
(max_total_bytes_for_query && query_memory_usage_delta > max_total_bytes_for_query / 2);
if (!shrink_blocks)
return;
if (!force_optimize)
{
if (shrink_blocks)
return; /// Already shrunk
/** If accounted data size is more than half of `max_bytes_in_join`
* or query memory consumption growth from the beginning of adding blocks (estimation of memory consumed by join using memory tracker)
* is bigger than half of all memory available for query,
* then shrink stored blocks to fit.
*/
shrink_blocks = (max_total_bytes_in_join && total_bytes_in_join > max_total_bytes_in_join / 2) ||
(max_total_bytes_for_query && query_memory_usage_delta > max_total_bytes_for_query / 2);
if (!shrink_blocks)
return;
}
LOG_DEBUG(log, "Shrinking stored blocks, memory consumption is {} {} calculated by join, {} {} by memory tracker",
ReadableSize(total_bytes_in_join), max_total_bytes_in_join ? fmt::format("/ {}", ReadableSize(max_total_bytes_in_join)) : "",

View File

@ -372,7 +372,7 @@ public:
void debugKeys() const;
void shrinkStoredBlocksToFit(size_t & total_bytes_in_join);
void shrinkStoredBlocksToFit(size_t & total_bytes_in_join, bool force_optimize = false);
void setMaxJoinedBlockRows(size_t value) { max_joined_block_rows = value; }

View File

@ -37,6 +37,7 @@
#include <Storages/StorageFactory.h>
#include <Storages/StorageInMemoryMetadata.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/StorageTimeSeries.h>
#include <Storages/WindowView/StorageWindowView.h>
#include <Interpreters/Context.h>
@ -751,6 +752,10 @@ InterpreterCreateQuery::TableProperties InterpreterCreateQuery::getTableProperti
if (create.storage && create.storage->engine)
getContext()->checkAccess(AccessType::TABLE_ENGINE, create.storage->engine->name);
/// If this is a TimeSeries table then we need to normalize list of columns (add missing columns and reorder), and also set inner table engines.
if (create.is_time_series_table && (mode < LoadingStrictnessLevel::ATTACH))
StorageTimeSeries::normalizeTableDefinition(create, getContext());
TableProperties properties;
TableLockHolder as_storage_lock;
@ -951,12 +956,40 @@ namespace
engine_ast->no_empty_args = true;
storage.set(storage.engine, engine_ast);
}
void setNullTableEngine(ASTStorage & storage)
{
auto engine_ast = std::make_shared<ASTFunction>();
engine_ast->name = "Null";
engine_ast->no_empty_args = true;
storage.set(storage.engine, engine_ast);
}
}
void InterpreterCreateQuery::setEngine(ASTCreateQuery & create) const
{
if (create.as_table_function)
{
if (getContext()->getSettingsRef().restore_replace_external_table_functions_to_null)
{
const auto & factory = TableFunctionFactory::instance();
auto properties = factory.tryGetProperties(create.as_table_function->as<ASTFunction>()->name);
if (properties && properties->allow_readonly)
return;
if (!create.storage)
{
auto storage_ast = std::make_shared<ASTStorage>();
create.set(create.storage, storage_ast);
}
else
throw Exception(ErrorCodes::LOGICAL_ERROR, "Storage should not be created yet, it's a bug.");
create.as_table_function = nullptr;
setNullTableEngine(*create.storage);
}
return;
}
if (create.is_dictionary || create.is_ordinary_view || create.is_live_view || create.is_window_view)
return;
@ -1007,6 +1040,13 @@ void InterpreterCreateQuery::setEngine(ASTCreateQuery & create) const
/// Some part of storage definition (such as PARTITION BY) is specified, but ENGINE is not: just set default one.
setDefaultTableEngine(*create.storage, getContext()->getSettingsRef().default_table_engine.value);
}
/// For external tables with restore_replace_external_engine_to_null setting we replace external engines to
/// Null table engine.
else if (getContext()->getSettingsRef().restore_replace_external_engines_to_null)
{
if (StorageFactory::instance().getStorageFeatures(create.storage->engine->name).source_access_type != AccessType::NONE)
setNullTableEngine(*create.storage);
}
return;
}
@ -1058,6 +1098,7 @@ void InterpreterCreateQuery::setEngine(ASTCreateQuery & create) const
else if (as_create.storage)
{
storage_def = typeid_cast<std::shared_ptr<ASTStorage>>(as_create.storage->ptr());
create.is_time_series_table = as_create.is_time_series_table;
}
else
{

View File

@ -105,9 +105,10 @@ ColumnsDescription SessionLogElement::getColumnsDescription()
{"MySQL", static_cast<Int8>(Interface::MYSQL)},
{"PostgreSQL", static_cast<Int8>(Interface::POSTGRESQL)},
{"Local", static_cast<Int8>(Interface::LOCAL)},
{"TCP_Interserver", static_cast<Int8>(Interface::TCP_INTERSERVER)}
{"TCP_Interserver", static_cast<Int8>(Interface::TCP_INTERSERVER)},
{"Prometheus", static_cast<Int8>(Interface::PROMETHEUS)},
});
static_assert(magic_enum::enum_count<Interface>() == 7);
static_assert(magic_enum::enum_count<Interface>() == 8);
auto lc_string_datatype = std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>());

View File

@ -106,6 +106,17 @@ Chunk Squashing::convertToChunk(CurrentData && data) const
Chunk Squashing::squash(std::vector<Chunk> && input_chunks, Chunk::ChunkInfoCollection && infos)
{
if (input_chunks.size() == 1)
{
/// this is just optimization, no logic changes
Chunk result = std::move(input_chunks.front());
infos.appendIfUniq(std::move(result.getChunkInfos()));
result.setChunkInfos(infos);
chassert(result);
return result;
}
std::vector<IColumn::MutablePtr> mutable_columns = {};
size_t rows = 0;
for (const Chunk & chunk : input_chunks)

View File

@ -1158,7 +1158,8 @@ bool TreeRewriterResult::collectUsedColumns(const ASTPtr & query, bool is_select
}
}
has_virtual_shard_num = is_remote_storage && storage->isVirtualColumn("_shard_num", storage_snapshot->getMetadataForQuery()) && virtuals->has("_shard_num");
has_virtual_shard_num
= is_remote_storage && storage->isVirtualColumn("_shard_num", storage_snapshot->metadata) && virtuals->has("_shard_num");
}
/// Collect missed object subcolumns

View File

@ -18,6 +18,7 @@
#include <Storages/ColumnsDescription.h>
#include <DataTypes/NestedUtils.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnConst.h>
#include <DataTypes/DataTypeArray.h>
#include <Storages/StorageInMemoryMetadata.h>
@ -35,8 +36,13 @@ namespace
/// Add all required expressions for missing columns calculation
void addDefaultRequiredExpressionsRecursively(
const Block & block, const String & required_column_name, DataTypePtr required_column_type,
const ColumnsDescription & columns, ASTPtr default_expr_list_accum, NameSet & added_columns, bool null_as_default)
const Block & block,
const String & required_column_name,
DataTypePtr required_column_type,
const ColumnsDescription & columns,
ASTPtr default_expr_list_accum,
NameSet & added_columns,
bool null_as_default)
{
checkStackSize();
@ -271,6 +277,53 @@ static std::unordered_map<String, ColumnPtr> collectOffsetsColumns(
return offsets_columns;
}
static ColumnPtr createColumnWithDefaultValue(const IDataType & data_type, const String & subcolumn_name, size_t num_rows)
{
auto column = data_type.createColumnConstWithDefaultValue(num_rows);
/// We must turn a constant column into a full column because the interpreter could infer
/// that it is constant everywhere but in some blocks (from other parts) it can be a full column.
if (subcolumn_name.empty())
return column->convertToFullColumnIfConst();
/// Firstly get subcolumn from const column and then replicate.
column = assert_cast<const ColumnConst &>(*column).getDataColumnPtr();
column = data_type.getSubcolumn(subcolumn_name, column);
return ColumnConst::create(std::move(column), num_rows)->convertToFullColumnIfConst();
}
static bool hasDefault(const StorageMetadataPtr & metadata_snapshot, const NameAndTypePair & column)
{
if (!metadata_snapshot)
return false;
const auto & columns = metadata_snapshot->getColumns();
if (columns.has(column.name))
return columns.hasDefault(column.name);
auto name_in_storage = column.getNameInStorage();
return columns.hasDefault(name_in_storage);
}
static String removeTupleElementsFromSubcolumn(String subcolumn_name, const Names & tuple_elements)
{
/// Add a dot to the end of name for convenience.
subcolumn_name += ".";
for (const auto & elem : tuple_elements)
{
auto pos = subcolumn_name.find(elem + ".");
if (pos != std::string::npos)
subcolumn_name.erase(pos, elem.size() + 1);
}
if (subcolumn_name.ends_with("."))
subcolumn_name.pop_back();
return subcolumn_name;
}
void fillMissingColumns(
Columns & res_columns,
size_t num_rows,
@ -296,21 +349,17 @@ void fillMissingColumns(
auto requested_column = requested_columns.begin();
for (size_t i = 0; i < num_columns; ++i, ++requested_column)
{
const auto & [name, type] = *requested_column;
if (res_columns[i] && partially_read_columns.contains(name))
if (res_columns[i] && partially_read_columns.contains(requested_column->name))
res_columns[i] = nullptr;
if (res_columns[i])
continue;
if (metadata_snapshot && metadata_snapshot->getColumns().hasDefault(name))
/// Nothing to fill or default should be filled in evaluateMissingDefaults
if (res_columns[i] || hasDefault(metadata_snapshot, *requested_column))
continue;
std::vector<ColumnPtr> current_offsets;
size_t num_dimensions = 0;
const auto * array_type = typeid_cast<const DataTypeArray *>(type.get());
const auto * array_type = typeid_cast<const DataTypeArray *>(requested_column->type.get());
if (array_type && !offsets_columns.empty())
{
num_dimensions = getNumberOfDimensions(*array_type);
@ -345,20 +394,34 @@ void fillMissingColumns(
if (!current_offsets.empty())
{
size_t num_empty_dimensions = num_dimensions - current_offsets.size();
auto scalar_type = createArrayOfType(getBaseTypeOfArray(type), num_empty_dimensions);
Names tuple_elements;
auto serialization = IDataType::getSerialization(*requested_column);
/// For Nested columns collect names of tuple elements and skip them while getting the base type of array.
IDataType::forEachSubcolumn([&](const auto & path, const auto &, const auto &)
{
if (path.back().type == ISerialization::Substream::TupleElement)
tuple_elements.push_back(path.back().name_of_substream);
}, ISerialization::SubstreamData(serialization));
/// The number of dimensions that belongs to the array itself but not shared in Nested column.
/// For example for column "n Nested(a UInt64, b Array(UInt64))" this value is 0 for `n.a` and 1 for `n.b`.
size_t num_empty_dimensions = num_dimensions - current_offsets.size();
auto base_type = getBaseTypeOfArray(requested_column->getTypeInStorage(), tuple_elements);
auto scalar_type = createArrayOfType(base_type, num_empty_dimensions);
size_t data_size = assert_cast<const ColumnUInt64 &>(*current_offsets.back()).getData().back();
res_columns[i] = scalar_type->createColumnConstWithDefaultValue(data_size)->convertToFullColumnIfConst();
/// Remove names of tuple elements because they are already processed by 'getBaseTypeOfArray'.
auto subcolumn_name = removeTupleElementsFromSubcolumn(requested_column->getSubcolumnName(), tuple_elements);
res_columns[i] = createColumnWithDefaultValue(*scalar_type, subcolumn_name, data_size);
for (auto it = current_offsets.rbegin(); it != current_offsets.rend(); ++it)
res_columns[i] = ColumnArray::create(res_columns[i], *it);
}
else
{
/// We must turn a constant column into a full column because the interpreter could infer
/// that it is constant everywhere but in some blocks (from other parts) it can be a full column.
res_columns[i] = type->createColumnConstWithDefaultValue(num_rows)->convertToFullColumnIfConst();
res_columns[i] = createColumnWithDefaultValue(*requested_column->getTypeInStorage(), requested_column->getSubcolumnName(), num_rows);
}
}
}

View File

@ -483,6 +483,13 @@ void ASTCreateQuery::formatQueryImpl(const FormatSettings & settings, FormatStat
if (auto to_storage = getTargetInnerEngine(ViewTarget::To))
to_storage->formatImpl(settings, state, frame);
if (targets)
{
targets->formatTarget(ViewTarget::Data, settings, state, frame);
targets->formatTarget(ViewTarget::Tags, settings, state, frame);
targets->formatTarget(ViewTarget::Metrics, settings, state, frame);
}
if (dictionary)
dictionary->formatImpl(settings, state, frame);

View File

@ -97,6 +97,7 @@ public:
bool is_materialized_view{false};
bool is_live_view{false};
bool is_window_view{false};
bool is_time_series_table{false}; /// CREATE TABLE ... ENGINE=TimeSeries() ...
bool is_populate{false};
bool is_create_empty{false}; /// CREATE TABLE ... EMPTY AS SELECT ...
bool replace_view{false}; /// CREATE OR REPLACE VIEW

View File

@ -21,6 +21,9 @@ std::string_view toString(ViewTarget::Kind kind)
{
case ViewTarget::To: return "to";
case ViewTarget::Inner: return "inner";
case ViewTarget::Data: return "data";
case ViewTarget::Tags: return "tags";
case ViewTarget::Metrics: return "metrics";
}
throw Exception(ErrorCodes::LOGICAL_ERROR, "{} doesn't support kind {}", __FUNCTION__, kind);
}
@ -254,6 +257,9 @@ std::optional<Keyword> ASTViewTargets::getKeywordForTableID(ViewTarget::Kind kin
{
case ViewTarget::To: return Keyword::TO; /// TO mydb.mydata
case ViewTarget::Inner: return std::nullopt;
case ViewTarget::Data: return Keyword::DATA; /// DATA mydb.mydata
case ViewTarget::Tags: return Keyword::TAGS; /// TAGS mydb.mytags
case ViewTarget::Metrics: return Keyword::METRICS; /// METRICS mydb.mymetrics
}
UNREACHABLE();
}
@ -264,6 +270,9 @@ std::optional<Keyword> ASTViewTargets::getKeywordForInnerStorage(ViewTarget::Kin
{
case ViewTarget::To: return std::nullopt; /// ENGINE = MergeTree()
case ViewTarget::Inner: return Keyword::INNER; /// INNER ENGINE = MergeTree()
case ViewTarget::Data: return Keyword::DATA; /// DATA ENGINE = MergeTree()
case ViewTarget::Tags: return Keyword::TAGS; /// TAGS ENGINE = MergeTree()
case ViewTarget::Metrics: return Keyword::METRICS; /// METRICS ENGINE = MergeTree()
}
UNREACHABLE();
}
@ -274,6 +283,9 @@ std::optional<Keyword> ASTViewTargets::getKeywordForInnerUUID(ViewTarget::Kind k
{
case ViewTarget::To: return Keyword::TO_INNER_UUID; /// TO INNER UUID 'XXX'
case ViewTarget::Inner: return std::nullopt;
case ViewTarget::Data: return Keyword::DATA_INNER_UUID; /// DATA INNER UUID 'XXX'
case ViewTarget::Tags: return Keyword::TAGS_INNER_UUID; /// TAGS INNER UUID 'XXX'
case ViewTarget::Metrics: return Keyword::METRICS_INNER_UUID; /// METRICS INNER UUID 'XXX'
}
UNREACHABLE();
}

View File

@ -9,7 +9,7 @@ namespace DB
class ASTStorage;
enum class Keyword : size_t;
/// Information about target tables (external or inner) of a materialized view or a window view.
/// Information about target tables (external or inner) of a materialized view or a window view or a TimeSeries table.
/// See ASTViewTargets for more details.
struct ViewTarget
{
@ -24,6 +24,15 @@ struct ViewTarget
/// If `kind == ViewTarget::Inner` then `ViewTarget` contains information about the "INNER" table of a window view:
/// CREATE WINDOW VIEW db.wv_name {INNER ENGINE inner_engine} AS SELECT ...
Inner,
/// The "data" table for a TimeSeries table, contains time series.
Data,
/// The "tags" table for a TimeSeries table, contains identifiers for each combination of a metric name and tags (labels).
Tags,
/// The "metrics" table for a TimeSeries table, contains general information (metadata) about metrics.
Metrics,
};
Kind kind = To;

View File

@ -116,6 +116,8 @@ namespace DB
MR_MACROS(CURRENT_TRANSACTION, "CURRENT TRANSACTION") \
MR_MACROS(CURRENTUSER, "CURRENTUSER") \
MR_MACROS(D, "D") \
MR_MACROS(DATA, "DATA") \
MR_MACROS(DATA_INNER_UUID, "DATA INNER UUID") \
MR_MACROS(DATABASE, "DATABASE") \
MR_MACROS(DATABASES, "DATABASES") \
MR_MACROS(DATE, "DATE") \
@ -288,6 +290,8 @@ namespace DB
MR_MACROS(MCS, "MCS") \
MR_MACROS(MEMORY, "MEMORY") \
MR_MACROS(MERGES, "MERGES") \
MR_MACROS(METRICS, "METRICS") \
MR_MACROS(METRICS_INNER_UUID, "METRICS INNER UUID") \
MR_MACROS(MI, "MI") \
MR_MACROS(MICROSECOND, "MICROSECOND") \
MR_MACROS(MICROSECONDS, "MICROSECONDS") \
@ -464,6 +468,8 @@ namespace DB
MR_MACROS(TABLE_OVERRIDE, "TABLE OVERRIDE") \
MR_MACROS(TABLE, "TABLE") \
MR_MACROS(TABLES, "TABLES") \
MR_MACROS(TAGS, "TAGS") \
MR_MACROS(TAGS_INNER_UUID, "TAGS INNER UUID") \
MR_MACROS(TEMPORARY_TABLE, "TEMPORARY TABLE") \
MR_MACROS(TEMPORARY, "TEMPORARY") \
MR_MACROS(TEST, "TEST") \

View File

@ -45,6 +45,13 @@ CreateQueryUUIDs::CreateQueryUUIDs(const ASTCreateQuery & query, bool generate_r
/// then MV will create inner table. We should generate UUID of inner table here.
if (query.is_materialized_view)
generate_target_uuid(ViewTarget::To);
if (query.is_time_series_table)
{
generate_target_uuid(ViewTarget::Data);
generate_target_uuid(ViewTarget::Tags);
generate_target_uuid(ViewTarget::Metrics);
}
}
}
}

View File

@ -696,6 +696,7 @@ bool ParserCreateTableQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expe
ASTPtr table;
ASTPtr columns_list;
std::shared_ptr<ASTStorage> storage;
bool is_time_series_table = false;
ASTPtr targets;
ASTPtr as_database;
ASTPtr as_table;
@ -784,6 +785,13 @@ bool ParserCreateTableQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expe
return false;
storage = typeid_cast<std::shared_ptr<ASTStorage>>(ast);
if (storage && storage->engine && (storage->engine->name == "TimeSeries"))
{
is_time_series_table = true;
ParserViewTargets({ViewTarget::Data, ViewTarget::Tags, ViewTarget::Metrics}).parse(pos, targets, expected);
}
return true;
};
@ -873,6 +881,7 @@ bool ParserCreateTableQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expe
query->create_or_replace = or_replace;
query->if_not_exists = if_not_exists;
query->temporary = is_temporary;
query->is_time_series_table = is_time_series_table;
query->database = table_id->getDatabase();
query->table = table_id->getTable();

Some files were not shown because too many files have changed in this diff Show More