Merge remote-tracking branch 'upstream/master' into HEAD

This commit is contained in:
Anton Popov 2023-12-11 14:14:25 +00:00
commit b5c2623b3c
197 changed files with 2735 additions and 1856 deletions

View File

@ -205,6 +205,12 @@ jobs:
with:
build_name: binary_amd64_compat
checkout_depth: 0
BuilderBinAmd64Musl:
needs: [DockerHubPush]
uses: ./.github/workflows/reusable_build.yml
with:
build_name: binary_amd64_musl
checkout_depth: 0
BuilderBinAarch64V80Compat:
needs: [DockerHubPush]
uses: ./.github/workflows/reusable_build.yml

View File

@ -242,6 +242,11 @@ jobs:
uses: ./.github/workflows/reusable_build.yml
with:
build_name: binary_amd64_compat
BuilderBinAmd64Musl:
needs: [FastTest, StyleCheck]
uses: ./.github/workflows/reusable_build.yml
with:
build_name: binary_amd64_musl
BuilderBinAarch64V80Compat:
needs: [FastTest, StyleCheck]
uses: ./.github/workflows/reusable_build.yml

View File

@ -30,7 +30,6 @@ int __gai_sigqueue(int sig, const union sigval val, pid_t caller_pid)
}
#include <sys/select.h>
#include <stdlib.h>
#include <features.h>

View File

@ -55,7 +55,6 @@ set (SRCS
src/DigestStream.cpp
src/DirectoryIterator.cpp
src/DirectoryIteratorStrategy.cpp
src/DirectoryWatcher.cpp
src/Environment.cpp
src/Error.cpp
src/ErrorHandler.cpp

View File

@ -1,228 +0,0 @@
//
// DirectoryWatcher.h
//
// Library: Foundation
// Package: Filesystem
// Module: DirectoryWatcher
//
// Definition of the DirectoryWatcher class.
//
// Copyright (c) 2012, Applied Informatics Software Engineering GmbH.
// and Contributors.
//
// SPDX-License-Identifier: BSL-1.0
//
#ifndef Foundation_DirectoryWatcher_INCLUDED
#define Foundation_DirectoryWatcher_INCLUDED
#include "Poco/Foundation.h"
#ifndef POCO_NO_INOTIFY
# include "Poco/AtomicCounter.h"
# include "Poco/BasicEvent.h"
# include "Poco/File.h"
# include "Poco/Runnable.h"
# include "Poco/Thread.h"
namespace Poco
{
class DirectoryWatcherStrategy;
class Foundation_API DirectoryWatcher : protected Runnable
/// This class is used to get notifications about changes
/// to the filesystem, more specifically, to a specific
/// directory. Changes to a directory are reported via
/// events.
///
/// A thread will be created that watches the specified
/// directory for changes. Events are reported in the context
/// of this thread.
///
/// Note that changes to files in subdirectories of the watched
/// directory are not reported. Separate DirectoryWatcher objects
/// must be created for these directories if they should be watched.
///
/// Changes to file attributes are not reported.
///
/// On Windows, this class is implemented using FindFirstChangeNotification()/FindNextChangeNotification().
/// On Linux, this class is implemented using inotify.
/// On FreeBSD and Darwin (Mac OS X, iOS), this class uses kevent/kqueue.
/// On all other platforms, the watched directory is periodically scanned
/// for changes. This can negatively affect performance if done too often.
/// Therefore, the interval in which scans are done can be specified in
/// the constructor. Note that periodic scanning will also be done on FreeBSD
/// and Darwin if events for changes to files (DW_ITEM_MODIFIED) are enabled.
///
/// DW_ITEM_MOVED_FROM and DW_ITEM_MOVED_TO events will only be reported
/// on Linux. On other platforms, a file rename or move operation
/// will be reported via a DW_ITEM_REMOVED and a DW_ITEM_ADDED event.
/// The order of these two events is not defined.
///
/// An event mask can be specified to enable only certain events.
{
public:
enum DirectoryEventType
{
DW_ITEM_ADDED = 1,
/// A new item has been created and added to the directory.
DW_ITEM_REMOVED = 2,
/// An item has been removed from the directory.
DW_ITEM_MODIFIED = 4,
/// An item has been modified.
DW_ITEM_MOVED_FROM = 8,
/// An item has been renamed or moved. This event delivers the old name.
DW_ITEM_MOVED_TO = 16
/// An item has been renamed or moved. This event delivers the new name.
};
enum DirectoryEventMask
{
DW_FILTER_ENABLE_ALL = 31,
/// Enables all event types.
DW_FILTER_DISABLE_ALL = 0
/// Disables all event types.
};
enum
{
DW_DEFAULT_SCAN_INTERVAL = 5 /// Default scan interval for platforms that don't provide a native notification mechanism.
};
struct DirectoryEvent
{
DirectoryEvent(const File & f, DirectoryEventType ev) : item(f), event(ev) { }
const File & item; /// The directory or file that has been changed.
DirectoryEventType event; /// The kind of event.
};
BasicEvent<const DirectoryEvent> itemAdded;
/// Fired when a file or directory has been created or added to the directory.
BasicEvent<const DirectoryEvent> itemRemoved;
/// Fired when a file or directory has been removed from the directory.
BasicEvent<const DirectoryEvent> itemModified;
/// Fired when a file or directory has been modified.
BasicEvent<const DirectoryEvent> itemMovedFrom;
/// Fired when a file or directory has been renamed. This event delivers the old name.
BasicEvent<const DirectoryEvent> itemMovedTo;
/// Fired when a file or directory has been moved. This event delivers the new name.
BasicEvent<const Exception> scanError;
/// Fired when an error occurs while scanning for changes.
DirectoryWatcher(const std::string & path, int eventMask = DW_FILTER_ENABLE_ALL, int scanInterval = DW_DEFAULT_SCAN_INTERVAL);
/// Creates a DirectoryWatcher for the directory given in path.
/// To enable only specific events, an eventMask can be specified by
/// OR-ing the desired event IDs (e.g., DW_ITEM_ADDED | DW_ITEM_MODIFIED).
/// On platforms where no native filesystem notifications are available,
/// scanInterval specifies the interval in seconds between scans
/// of the directory.
DirectoryWatcher(const File & directory, int eventMask = DW_FILTER_ENABLE_ALL, int scanInterval = DW_DEFAULT_SCAN_INTERVAL);
/// Creates a DirectoryWatcher for the specified directory
/// To enable only specific events, an eventMask can be specified by
/// OR-ing the desired event IDs (e.g., DW_ITEM_ADDED | DW_ITEM_MODIFIED).
/// On platforms where no native filesystem notifications are available,
/// scanInterval specifies the interval in seconds between scans
/// of the directory.
~DirectoryWatcher();
/// Destroys the DirectoryWatcher.
void suspendEvents();
/// Suspends sending of events. Can be called multiple times, but every
/// call to suspendEvent() must be matched by a call to resumeEvents().
void resumeEvents();
/// Resumes events, after they have been suspended with a call to suspendEvents().
bool eventsSuspended() const;
/// Returns true iff events are suspended.
int eventMask() const;
/// Returns the value of the eventMask passed to the constructor.
int scanInterval() const;
/// Returns the scan interval in seconds.
const File & directory() const;
/// Returns the directory being watched.
bool supportsMoveEvents() const;
/// Returns true iff the platform supports DW_ITEM_MOVED_FROM/itemMovedFrom and
/// DW_ITEM_MOVED_TO/itemMovedTo events.
protected:
void init();
void stop();
void run();
private:
DirectoryWatcher();
DirectoryWatcher(const DirectoryWatcher &);
DirectoryWatcher & operator=(const DirectoryWatcher &);
Thread _thread;
File _directory;
int _eventMask;
AtomicCounter _eventsSuspended;
int _scanInterval;
DirectoryWatcherStrategy * _pStrategy;
};
//
// inlines
//
inline bool DirectoryWatcher::eventsSuspended() const
{
return _eventsSuspended.value() > 0;
}
inline int DirectoryWatcher::eventMask() const
{
return _eventMask;
}
inline int DirectoryWatcher::scanInterval() const
{
return _scanInterval;
}
inline const File & DirectoryWatcher::directory() const
{
return _directory;
}
} // namespace Poco
#endif // POCO_NO_INOTIFY
#endif // Foundation_DirectoryWatcher_INCLUDED

View File

@ -1,602 +0,0 @@
//
// DirectoryWatcher.cpp
//
// Library: Foundation
// Package: Filesystem
// Module: DirectoryWatcher
//
// Copyright (c) 2012, Applied Informatics Software Engineering GmbH.
// and Contributors.
//
// SPDX-License-Identifier: BSL-1.0
//
#include "Poco/DirectoryWatcher.h"
#ifndef POCO_NO_INOTIFY
#include "Poco/Path.h"
#include "Poco/Glob.h"
#include "Poco/DirectoryIterator.h"
#include "Poco/Event.h"
#include "Poco/Exception.h"
#include "Poco/Buffer.h"
#if POCO_OS == POCO_OS_LINUX || POCO_OS == POCO_OS_ANDROID
#include <sys/inotify.h>
#include <sys/select.h>
#include <unistd.h>
#elif POCO_OS == POCO_OS_MAC_OS_X || POCO_OS == POCO_OS_FREE_BSD
#include <fcntl.h>
#include <sys/types.h>
#include <sys/event.h>
#include <sys/time.h>
#include <unistd.h>
#if (POCO_OS == POCO_OS_FREE_BSD) && !defined(O_EVTONLY)
#define O_EVTONLY 0x8000
#endif
#endif
#include <algorithm>
#include <atomic>
#include <map>
namespace Poco {
class DirectoryWatcherStrategy
{
public:
DirectoryWatcherStrategy(DirectoryWatcher& owner):
_owner(owner)
{
}
virtual ~DirectoryWatcherStrategy()
{
}
DirectoryWatcher& owner()
{
return _owner;
}
virtual void run() = 0;
virtual void stop() = 0;
virtual bool supportsMoveEvents() const = 0;
protected:
struct ItemInfo
{
ItemInfo():
size(0)
{
}
ItemInfo(const ItemInfo& other):
path(other.path),
size(other.size),
lastModified(other.lastModified)
{
}
explicit ItemInfo(const File& f):
path(f.path()),
size(f.isFile() ? f.getSize() : 0),
lastModified(f.getLastModified())
{
}
std::string path;
File::FileSize size;
Timestamp lastModified;
};
typedef std::map<std::string, ItemInfo> ItemInfoMap;
void scan(ItemInfoMap& entries)
{
DirectoryIterator it(owner().directory());
DirectoryIterator end;
while (it != end)
{
entries[it.path().getFileName()] = ItemInfo(*it);
++it;
}
}
void compare(ItemInfoMap& oldEntries, ItemInfoMap& newEntries)
{
for (ItemInfoMap::iterator itn = newEntries.begin(); itn != newEntries.end(); ++itn)
{
ItemInfoMap::iterator ito = oldEntries.find(itn->first);
if (ito != oldEntries.end())
{
if ((owner().eventMask() & DirectoryWatcher::DW_ITEM_MODIFIED) && !owner().eventsSuspended())
{
if (itn->second.size != ito->second.size || itn->second.lastModified != ito->second.lastModified)
{
Poco::File f(itn->second.path);
DirectoryWatcher::DirectoryEvent ev(f, DirectoryWatcher::DW_ITEM_MODIFIED);
owner().itemModified(&owner(), ev);
}
}
oldEntries.erase(ito);
}
else if ((owner().eventMask() & DirectoryWatcher::DW_ITEM_ADDED) && !owner().eventsSuspended())
{
Poco::File f(itn->second.path);
DirectoryWatcher::DirectoryEvent ev(f, DirectoryWatcher::DW_ITEM_ADDED);
owner().itemAdded(&owner(), ev);
}
}
if ((owner().eventMask() & DirectoryWatcher::DW_ITEM_REMOVED) && !owner().eventsSuspended())
{
for (ItemInfoMap::iterator it = oldEntries.begin(); it != oldEntries.end(); ++it)
{
Poco::File f(it->second.path);
DirectoryWatcher::DirectoryEvent ev(f, DirectoryWatcher::DW_ITEM_REMOVED);
owner().itemRemoved(&owner(), ev);
}
}
}
private:
DirectoryWatcherStrategy();
DirectoryWatcherStrategy(const DirectoryWatcherStrategy&);
DirectoryWatcherStrategy& operator = (const DirectoryWatcherStrategy&);
DirectoryWatcher& _owner;
};
#if POCO_OS == POCO_OS_WINDOWS_NT
class WindowsDirectoryWatcherStrategy: public DirectoryWatcherStrategy
{
public:
WindowsDirectoryWatcherStrategy(DirectoryWatcher& owner):
DirectoryWatcherStrategy(owner)
{
_hStopped = CreateEventW(NULL, FALSE, FALSE, NULL);
if (!_hStopped)
throw SystemException("cannot create event");
}
~WindowsDirectoryWatcherStrategy()
{
CloseHandle(_hStopped);
}
void run()
{
ItemInfoMap entries;
scan(entries);
DWORD filter = FILE_NOTIFY_CHANGE_FILE_NAME | FILE_NOTIFY_CHANGE_DIR_NAME;
if (owner().eventMask() & DirectoryWatcher::DW_ITEM_MODIFIED)
filter |= FILE_NOTIFY_CHANGE_SIZE | FILE_NOTIFY_CHANGE_LAST_WRITE;
std::string path(owner().directory().path());
HANDLE hChange = FindFirstChangeNotificationA(path.c_str(), FALSE, filter);
if (hChange == INVALID_HANDLE_VALUE)
{
try
{
FileImpl::handleLastErrorImpl(path);
}
catch (Poco::Exception& exc)
{
owner().scanError(&owner(), exc);
}
return;
}
bool stopped = false;
while (!stopped)
{
try
{
HANDLE h[2];
h[0] = _hStopped;
h[1] = hChange;
switch (WaitForMultipleObjects(2, h, FALSE, INFINITE))
{
case WAIT_OBJECT_0:
stopped = true;
break;
case WAIT_OBJECT_0 + 1:
{
ItemInfoMap newEntries;
scan(newEntries);
compare(entries, newEntries);
std::swap(entries, newEntries);
if (FindNextChangeNotification(hChange) == FALSE)
{
FileImpl::handleLastErrorImpl(path);
}
}
break;
default:
throw SystemException("failed to wait for directory changes");
}
}
catch (Poco::Exception& exc)
{
owner().scanError(&owner(), exc);
}
}
FindCloseChangeNotification(hChange);
}
void stop()
{
SetEvent(_hStopped);
}
bool supportsMoveEvents() const
{
return false;
}
private:
HANDLE _hStopped;
};
#elif POCO_OS == POCO_OS_LINUX || POCO_OS == POCO_OS_ANDROID
class LinuxDirectoryWatcherStrategy: public DirectoryWatcherStrategy
{
public:
LinuxDirectoryWatcherStrategy(DirectoryWatcher& owner):
DirectoryWatcherStrategy(owner),
_fd(-1),
_stopped(false)
{
_fd = inotify_init();
if (_fd == -1) throw Poco::IOException("cannot initialize inotify", errno);
}
~LinuxDirectoryWatcherStrategy()
{
close(_fd);
}
void run()
{
int mask = 0;
if (owner().eventMask() & DirectoryWatcher::DW_ITEM_ADDED)
mask |= IN_CREATE;
if (owner().eventMask() & DirectoryWatcher::DW_ITEM_REMOVED)
mask |= IN_DELETE;
if (owner().eventMask() & DirectoryWatcher::DW_ITEM_MODIFIED)
mask |= IN_MODIFY;
if (owner().eventMask() & DirectoryWatcher::DW_ITEM_MOVED_FROM)
mask |= IN_MOVED_FROM;
if (owner().eventMask() & DirectoryWatcher::DW_ITEM_MOVED_TO)
mask |= IN_MOVED_TO;
int wd = inotify_add_watch(_fd, owner().directory().path().c_str(), mask);
if (wd == -1)
{
try
{
FileImpl::handleLastErrorImpl(owner().directory().path());
}
catch (Poco::Exception& exc)
{
owner().scanError(&owner(), exc);
}
}
Poco::Buffer<char> buffer(4096);
while (!_stopped.load(std::memory_order_relaxed))
{
fd_set fds;
FD_ZERO(&fds);
FD_SET(_fd, &fds);
struct timeval tv;
tv.tv_sec = 0;
tv.tv_usec = 200000;
if (select(_fd + 1, &fds, NULL, NULL, &tv) == 1)
{
int n = read(_fd, buffer.begin(), buffer.size());
int i = 0;
if (n > 0)
{
while (n > 0)
{
struct inotify_event* event = reinterpret_cast<struct inotify_event*>(buffer.begin() + i);
if (event->len > 0)
{
if (!owner().eventsSuspended())
{
Poco::Path p(owner().directory().path());
p.makeDirectory();
p.setFileName(event->name);
Poco::File f(p.toString());
if ((event->mask & IN_CREATE) && (owner().eventMask() & DirectoryWatcher::DW_ITEM_ADDED))
{
DirectoryWatcher::DirectoryEvent ev(f, DirectoryWatcher::DW_ITEM_ADDED);
owner().itemAdded(&owner(), ev);
}
if ((event->mask & IN_DELETE) && (owner().eventMask() & DirectoryWatcher::DW_ITEM_REMOVED))
{
DirectoryWatcher::DirectoryEvent ev(f, DirectoryWatcher::DW_ITEM_REMOVED);
owner().itemRemoved(&owner(), ev);
}
if ((event->mask & IN_MODIFY) && (owner().eventMask() & DirectoryWatcher::DW_ITEM_MODIFIED))
{
DirectoryWatcher::DirectoryEvent ev(f, DirectoryWatcher::DW_ITEM_MODIFIED);
owner().itemModified(&owner(), ev);
}
if ((event->mask & IN_MOVED_FROM) && (owner().eventMask() & DirectoryWatcher::DW_ITEM_MOVED_FROM))
{
DirectoryWatcher::DirectoryEvent ev(f, DirectoryWatcher::DW_ITEM_MOVED_FROM);
owner().itemMovedFrom(&owner(), ev);
}
if ((event->mask & IN_MOVED_TO) && (owner().eventMask() & DirectoryWatcher::DW_ITEM_MOVED_TO))
{
DirectoryWatcher::DirectoryEvent ev(f, DirectoryWatcher::DW_ITEM_MOVED_TO);
owner().itemMovedTo(&owner(), ev);
}
}
}
i += sizeof(inotify_event) + event->len;
n -= sizeof(inotify_event) + event->len;
}
}
}
}
}
void stop()
{
_stopped.store(true, std::memory_order_relaxed);
}
bool supportsMoveEvents() const
{
return true;
}
private:
int _fd;
std::atomic<bool> _stopped;
};
#elif POCO_OS == POCO_OS_MAC_OS_X || POCO_OS == POCO_OS_FREE_BSD
class BSDDirectoryWatcherStrategy: public DirectoryWatcherStrategy
{
public:
BSDDirectoryWatcherStrategy(DirectoryWatcher& owner):
DirectoryWatcherStrategy(owner),
_queueFD(-1),
_dirFD(-1),
_stopped(false)
{
_dirFD = open(owner.directory().path().c_str(), O_EVTONLY);
if (_dirFD < 0) throw Poco::FileNotFoundException(owner.directory().path());
_queueFD = kqueue();
if (_queueFD < 0)
{
close(_dirFD);
throw Poco::SystemException("Cannot create kqueue", errno);
}
}
~BSDDirectoryWatcherStrategy()
{
close(_dirFD);
close(_queueFD);
}
void run()
{
Poco::Timestamp lastScan;
ItemInfoMap entries;
scan(entries);
while (!_stopped.load(std::memory_order_relaxed))
{
struct timespec timeout;
timeout.tv_sec = 0;
timeout.tv_nsec = 200000000;
unsigned eventFilter = NOTE_WRITE;
struct kevent event;
struct kevent eventData;
EV_SET(&event, _dirFD, EVFILT_VNODE, EV_ADD | EV_CLEAR, eventFilter, 0, 0);
int nEvents = kevent(_queueFD, &event, 1, &eventData, 1, &timeout);
if (nEvents < 0 || eventData.flags == EV_ERROR)
{
try
{
FileImpl::handleLastErrorImpl(owner().directory().path());
}
catch (Poco::Exception& exc)
{
owner().scanError(&owner(), exc);
}
}
else if (nEvents > 0 || ((owner().eventMask() & DirectoryWatcher::DW_ITEM_MODIFIED) && lastScan.isElapsed(owner().scanInterval()*1000000)))
{
ItemInfoMap newEntries;
scan(newEntries);
compare(entries, newEntries);
std::swap(entries, newEntries);
lastScan.update();
}
}
}
void stop()
{
_stopped.store(true, std::memory_order_relaxed);
}
bool supportsMoveEvents() const
{
return false;
}
private:
int _queueFD;
int _dirFD;
std::atomic<bool> _stopped;
};
#else
class PollingDirectoryWatcherStrategy: public DirectoryWatcherStrategy
{
public:
PollingDirectoryWatcherStrategy(DirectoryWatcher& owner):
DirectoryWatcherStrategy(owner)
{
}
~PollingDirectoryWatcherStrategy()
{
}
void run()
{
ItemInfoMap entries;
scan(entries);
while (!_stopped.tryWait(1000*owner().scanInterval()))
{
try
{
ItemInfoMap newEntries;
scan(newEntries);
compare(entries, newEntries);
std::swap(entries, newEntries);
}
catch (Poco::Exception& exc)
{
owner().scanError(&owner(), exc);
}
}
}
void stop()
{
_stopped.set();
}
bool supportsMoveEvents() const
{
return false;
}
private:
Poco::Event _stopped;
};
#endif
DirectoryWatcher::DirectoryWatcher(const std::string& path, int eventMask, int scanInterval):
_directory(path),
_eventMask(eventMask),
_scanInterval(scanInterval)
{
init();
}
DirectoryWatcher::DirectoryWatcher(const Poco::File& directory, int eventMask, int scanInterval):
_directory(directory),
_eventMask(eventMask),
_scanInterval(scanInterval)
{
init();
}
DirectoryWatcher::~DirectoryWatcher()
{
try
{
stop();
delete _pStrategy;
}
catch (...)
{
poco_unexpected();
}
}
void DirectoryWatcher::suspendEvents()
{
poco_assert (_eventsSuspended > 0);
_eventsSuspended--;
}
void DirectoryWatcher::resumeEvents()
{
_eventsSuspended++;
}
void DirectoryWatcher::init()
{
if (!_directory.exists())
throw Poco::FileNotFoundException(_directory.path());
if (!_directory.isDirectory())
throw Poco::InvalidArgumentException("not a directory", _directory.path());
#if POCO_OS == POCO_OS_WINDOWS_NT
_pStrategy = new WindowsDirectoryWatcherStrategy(*this);
#elif POCO_OS == POCO_OS_LINUX || POCO_OS == POCO_OS_ANDROID
_pStrategy = new LinuxDirectoryWatcherStrategy(*this);
#elif POCO_OS == POCO_OS_MAC_OS_X || POCO_OS == POCO_OS_FREE_BSD
_pStrategy = new BSDDirectoryWatcherStrategy(*this);
#else
_pStrategy = new PollingDirectoryWatcherStrategy(*this);
#endif
_thread.start(*this);
}
void DirectoryWatcher::run()
{
_pStrategy->run();
}
void DirectoryWatcher::stop()
{
_pStrategy->stop();
_thread.join();
}
bool DirectoryWatcher::supportsMoveEvents() const
{
return _pStrategy->supportsMoveEvents();
}
} // namespace Poco
#endif // POCO_NO_INOTIFY

View File

@ -42,10 +42,8 @@ if (CMAKE_CROSSCOMPILING)
if (ARCH_AARCH64)
# FIXME: broken dependencies
set (ENABLE_GRPC OFF CACHE INTERNAL "")
set (ENABLE_SENTRY OFF CACHE INTERNAL "")
elseif (ARCH_PPC64LE)
set (ENABLE_GRPC OFF CACHE INTERNAL "")
set (ENABLE_SENTRY OFF CACHE INTERNAL "")
elseif (ARCH_RISCV64)
# RISC-V support is preliminary
set (GLIBC_COMPATIBILITY OFF CACHE INTERNAL "")
@ -73,19 +71,10 @@ if (CMAKE_CROSSCOMPILING)
message (FATAL_ERROR "Trying to cross-compile to unsupported system: ${CMAKE_SYSTEM_NAME}!")
endif ()
if (USE_MUSL)
# use of undeclared identifier 'PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP'
set (ENABLE_SENTRY OFF CACHE INTERNAL "")
set (ENABLE_ODBC OFF CACHE INTERNAL "")
set (ENABLE_GRPC OFF CACHE INTERNAL "")
set (ENABLE_HDFS OFF CACHE INTERNAL "")
set (ENABLE_EMBEDDED_COMPILER OFF CACHE INTERNAL "")
# use of drand48_data
set (ENABLE_AZURE_BLOB_STORAGE OFF CACHE INTERNAL "")
endif ()
# Don't know why but CXX_STANDARD doesn't work for cross-compilation
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++20")
message (STATUS "Cross-compiling for target: ${CMAKE_CXX_COMPILE_TARGET}")
endif ()
if (USE_MUSL)
# Does not work for unknown reason
set (ENABLE_RUST OFF CACHE INTERNAL "")
endif ()

View File

@ -134,9 +134,9 @@ add_contrib (libuv-cmake libuv)
add_contrib (liburing-cmake liburing)
add_contrib (amqpcpp-cmake AMQP-CPP) # requires: libuv
add_contrib (cassandra-cmake cassandra) # requires: libuv
add_contrib (curl-cmake curl)
add_contrib (azure-cmake azure) # requires: curl
if (NOT OS_DARWIN)
add_contrib (curl-cmake curl)
add_contrib (azure-cmake azure) # requires: curl
add_contrib (sentry-native-cmake sentry-native) # requires: curl
endif()
add_contrib (fmtlib-cmake fmtlib)

2
contrib/azure vendored

@ -1 +1 @@
Subproject commit 096049bf24fffafcaccc132b9367694532716731
Subproject commit 352ff0a61cb319ac1cc38c4058443ddf70147530

View File

@ -10,7 +10,7 @@ set (LIBRARY_DIR "${ClickHouse_SOURCE_DIR}/contrib/curl")
set (SRCS
"${LIBRARY_DIR}/lib/altsvc.c"
"${LIBRARY_DIR}/lib/amigaos.c"
"${LIBRARY_DIR}/lib/asyn-thread.c"
"${LIBRARY_DIR}/lib/asyn-ares.c"
"${LIBRARY_DIR}/lib/base64.c"
"${LIBRARY_DIR}/lib/bufq.c"
"${LIBRARY_DIR}/lib/bufref.c"
@ -165,13 +165,14 @@ target_compile_definitions (_curl PRIVATE
libcurl_EXPORTS
OS="${CMAKE_SYSTEM_NAME}"
)
target_include_directories (_curl SYSTEM PUBLIC
"${LIBRARY_DIR}/include"
"${LIBRARY_DIR}/lib"
. # curl_config.h
)
target_link_libraries (_curl PRIVATE OpenSSL::SSL)
target_link_libraries (_curl PRIVATE OpenSSL::SSL ch_contrib::c-ares)
# The library is large - avoid bloat (XXX: is it?)
if (OMIT_HEAVY_DEBUG_SYMBOLS)

View File

@ -50,3 +50,4 @@
#define ENABLE_IPV6
#define USE_OPENSSL
#define USE_THREADS_POSIX
#define USE_ARES

2
contrib/libhdfs3 vendored

@ -1 +1 @@
Subproject commit bdcb91354b1c05b21e73043a112a6f1e3b013497
Subproject commit b9598e6016720a7c088bfe85ce1fa0410f9d2103

View File

@ -26,6 +26,11 @@ ADD_DEFINITIONS(-D__STDC_FORMAT_MACROS)
ADD_DEFINITIONS(-D_GNU_SOURCE)
ADD_DEFINITIONS(-D_GLIBCXX_USE_NANOSLEEP)
ADD_DEFINITIONS(-DHAVE_NANOSLEEP)
if (USE_MUSL)
ADD_DEFINITIONS(-DSTRERROR_R_RETURN_INT)
endif ()
set(HAVE_STEADY_CLOCK 1)
set(HAVE_NESTED_EXCEPTION 1)
SET(HAVE_BOOST_CHRONO 0)

View File

@ -270,7 +270,7 @@ XMLPUBFUN void XMLCALL xmlCheckVersion(int version);
*
* Whether iconv support is available
*/
#if 1
#if 0
#define LIBXML_ICONV_ENABLED
#endif
@ -499,5 +499,3 @@ XMLPUBFUN void XMLCALL xmlCheckVersion(int version);
}
#endif /* __cplusplus */
#endif

@ -1 +1 @@
Subproject commit e7b8befca85c8b847614432dba250c22d35fbae0
Subproject commit 1834e42289c58402c804a87be4d489892b88f3ec

View File

@ -117,7 +117,7 @@ endif()
add_definitions(-DROCKSDB_PLATFORM_POSIX -DROCKSDB_LIB_IO_POSIX)
if (OS_LINUX OR OS_FREEBSD)
if ((OS_LINUX OR OS_FREEBSD) AND NOT USE_MUSL)
add_definitions(-DROCKSDB_PTHREAD_ADAPTIVE_MUTEX)
endif()

@ -1 +1 @@
Subproject commit ae10fb8c224c3f41571446e1ed7fd57b9e5e366b
Subproject commit bc359f86cbf0f73f6fd4b6bfb4ede0c1f8c9400f

View File

@ -13,6 +13,7 @@ set (SRC_DIR "${ClickHouse_SOURCE_DIR}/contrib/sentry-native")
set (SRCS
${SRC_DIR}/vendor/mpack.c
${SRC_DIR}/vendor/stb_sprintf.c
${SRC_DIR}/src/sentry_alloc.c
${SRC_DIR}/src/sentry_backend.c
${SRC_DIR}/src/sentry_core.c
@ -21,6 +22,7 @@ set (SRCS
${SRC_DIR}/src/sentry_json.c
${SRC_DIR}/src/sentry_logger.c
${SRC_DIR}/src/sentry_options.c
${SRC_DIR}/src/sentry_os.c
${SRC_DIR}/src/sentry_random.c
${SRC_DIR}/src/sentry_ratelimiter.c
${SRC_DIR}/src/sentry_scope.c
@ -29,6 +31,7 @@ set (SRCS
${SRC_DIR}/src/sentry_string.c
${SRC_DIR}/src/sentry_sync.c
${SRC_DIR}/src/sentry_transport.c
${SRC_DIR}/src/sentry_tracing.c
${SRC_DIR}/src/sentry_utils.c
${SRC_DIR}/src/sentry_uuid.c
${SRC_DIR}/src/sentry_value.c

View File

@ -1,7 +1,7 @@
option (ENABLE_ODBC "Enable ODBC library" ${ENABLE_LIBRARIES})
if (NOT OS_LINUX)
if (NOT OS_LINUX OR USE_MUSL)
if (ENABLE_ODBC)
message(STATUS "ODBC is only supported on Linux")
message(STATUS "ODBC is only supported on Linux with dynamic linking")
endif()
set (ENABLE_ODBC OFF CACHE INTERNAL "")
endif ()

View File

@ -145,6 +145,7 @@ def parse_env_variables(
RISCV_SUFFIX = "-riscv64"
S390X_SUFFIX = "-s390x"
AMD64_COMPAT_SUFFIX = "-amd64-compat"
AMD64_MUSL_SUFFIX = "-amd64-musl"
result = []
result.append("OUTPUT_DIR=/output")
@ -163,6 +164,7 @@ def parse_env_variables(
is_cross_s390x = compiler.endswith(S390X_SUFFIX)
is_cross_freebsd = compiler.endswith(FREEBSD_SUFFIX)
is_amd64_compat = compiler.endswith(AMD64_COMPAT_SUFFIX)
is_amd64_musl = compiler.endswith(AMD64_MUSL_SUFFIX)
if is_cross_darwin:
cc = compiler[: -len(DARWIN_SUFFIX)]
@ -232,6 +234,12 @@ def parse_env_variables(
cc = compiler[: -len(AMD64_COMPAT_SUFFIX)]
result.append("DEB_ARCH=amd64")
cmake_flags.append("-DNO_SSE3_OR_HIGHER=1")
elif is_amd64_musl:
cc = compiler[: -len(AMD64_MUSL_SUFFIX)]
result.append("DEB_ARCH=amd64")
cmake_flags.append(
"-DCMAKE_TOOLCHAIN_FILE=/build/cmake/linux/toolchain-x86_64-musl.cmake"
)
else:
cc = compiler
result.append("DEB_ARCH=amd64")
@ -396,6 +404,7 @@ def parse_args() -> argparse.Namespace:
"clang-17-riscv64",
"clang-17-s390x",
"clang-17-amd64-compat",
"clang-17-amd64-musl",
"clang-17-freebsd",
),
default="clang-17",

View File

@ -16,7 +16,7 @@ export LLVM_VERSION=${LLVM_VERSION:-17}
# it being undefined. Also read it as array so that we can pass an empty list
# of additional variable to cmake properly, and it doesn't generate an extra
# empty parameter.
# Read it as CMAKE_FLAGS to not lose exported FASTTEST_CMAKE_FLAGS on subsequential launch
# Read it as CMAKE_FLAGS to not lose exported FASTTEST_CMAKE_FLAGS on subsequent launch
read -ra CMAKE_FLAGS <<< "${FASTTEST_CMAKE_FLAGS:-}"
# Run only matching tests.
@ -197,7 +197,7 @@ function run_cmake
(
cd "$FASTTEST_BUILD"
cmake "$FASTTEST_SOURCE" -DCMAKE_CXX_COMPILER="clang++-${LLVM_VERSION}" -DCMAKE_C_COMPILER="clang-${LLVM_VERSION}" "${CMAKE_LIBS_CONFIG[@]}" "${CMAKE_FLAGS[@]}" 2>&1 | ts '%Y-%m-%d %H:%M:%S' | tee "$FASTTEST_OUTPUT/cmake_log.txt"
cmake "$FASTTEST_SOURCE" -DCMAKE_CXX_COMPILER="clang++-${LLVM_VERSION}" -DCMAKE_C_COMPILER="clang-${LLVM_VERSION}" -DCMAKE_TOOLCHAIN_FILE="${FASTTEST_SOURCE}/cmake/linux/toolchain-x86_64-musl.cmake" "${CMAKE_LIBS_CONFIG[@]}" "${CMAKE_FLAGS[@]}" 2>&1 | ts '%Y-%m-%d %H:%M:%S' | tee "$FASTTEST_OUTPUT/cmake_log.txt"
)
}

View File

@ -1,7 +1,7 @@
# docker build -t clickhouse/integration-helper .
# Helper docker container to run iptables without sudo
FROM alpine
FROM alpine:3.18
RUN apk add --no-cache -U iproute2 \
&& for bin in iptables iptables-restore iptables-save; \
do ln -sf xtables-nft-multi "/sbin/$bin"; \

View File

@ -28,18 +28,20 @@ sudo apt-get install clang-17
Lets remember the path where we install `cctools` as ${CCTOOLS}
``` bash
mkdir ~/cctools
export CCTOOLS=$(cd ~/cctools && pwd)
mkdir ${CCTOOLS}
cd ${CCTOOLS}
git clone --depth=1 https://github.com/tpoechtrager/apple-libtapi.git
git clone https://github.com/tpoechtrager/apple-libtapi.git
cd apple-libtapi
git checkout 15dfc2a8c9a2a89d06ff227560a69f5265b692f9
INSTALLPREFIX=${CCTOOLS} ./build.sh
./install.sh
cd ..
git clone --depth=1 https://github.com/tpoechtrager/cctools-port.git
git clone https://github.com/tpoechtrager/cctools-port.git
cd cctools-port/cctools
git checkout 2a3e1c2a6ff54a30f898b70cfb9ba1692a55fad7
./configure --prefix=$(readlink -f ${CCTOOLS}) --with-libtapi=$(readlink -f ${CCTOOLS}) --target=x86_64-apple-darwin
make install
```

View File

@ -3,7 +3,7 @@ slug: /en/development/build-osx
sidebar_position: 65
sidebar_label: Build on macOS
title: How to Build ClickHouse on macOS
description: How to build ClickHouse on macOS
description: How to build ClickHouse on macOS for macOS
---
:::info You don't have to build ClickHouse yourself!

View File

@ -7,42 +7,39 @@ description: Prerequisites and an overview of how to build ClickHouse
# Getting Started Guide for Building ClickHouse
The building of ClickHouse is supported on Linux, FreeBSD and macOS.
ClickHouse can be build on Linux, FreeBSD and macOS. If you use Windows, you can still build ClickHouse in a virtual machine running Linux, e.g. [VirtualBox](https://www.virtualbox.org/) with Ubuntu.
If you use Windows, you need to create a virtual machine with Ubuntu. To start working with a virtual machine please install VirtualBox. You can download Ubuntu from the website: https://www.ubuntu.com/#download. Please create a virtual machine from the downloaded image (you should reserve at least 4GB of RAM for it). To run a command-line terminal in Ubuntu, please locate a program containing the word “terminal” in its name (gnome-terminal, konsole etc.) or just press Ctrl+Alt+T.
ClickHouse cannot work or build on a 32-bit system. You should acquire access to a 64-bit system and you can continue reading.
ClickHouse requires a 64-bit system to compile and run, 32-bit systems do not work.
## Creating a Repository on GitHub {#creating-a-repository-on-github}
To start working with ClickHouse repository you will need a GitHub account.
To start developing for ClickHouse you will need a [GitHub](https://www.virtualbox.org/) account. Please also generate a SSH key locally (if you don't have one already) and upload the public key to GitHub as this is a prerequisite for contributing patches.
You probably already have one, but if you do not, please register at https://github.com. In case you do not have SSH keys, you should generate them and then upload them on GitHub. It is required for sending over your patches. It is also possible to use the same SSH keys that you use with any other SSH servers - probably you already have those.
Next, create a fork of the [ClickHouse repository](https://github.com/ClickHouse/ClickHouse/) in your personal account by clicking the "fork" button in the upper right corner.
Create a fork of ClickHouse repository. To do that please click on the “fork” button in the upper right corner at https://github.com/ClickHouse/ClickHouse. It will fork your own copy of ClickHouse/ClickHouse to your account.
To contribute, e.g. a fix for an issue or a feature, please commit your changes to a branch in your fork, then create a "pull request" with the changes to the main repository.
The development process consists of first committing the intended changes into your fork of ClickHouse and then creating a “pull request” for these changes to be accepted into the main repository (ClickHouse/ClickHouse).
For working with Git repositories, please install `git`. In Ubuntu run these commands in a terminal:
To work with Git repositories, please install `git`. To do that in Ubuntu you would run in the command line terminal:
```sh
sudo apt update
sudo apt install git
```
sudo apt update
sudo apt install git
A brief manual on using Git can be found [here](https://education.github.com/git-cheat-sheet-education.pdf).
For a detailed manual on Git see [here](https://git-scm.com/book/en/v2).
A cheatsheet for using Git can be found [here](https://education.github.com/git-cheat-sheet-education.pdf). The detailed manual for Git is [here](https://git-scm.com/book/en/v2).
## Cloning a Repository to Your Development Machine {#cloning-a-repository-to-your-development-machine}
Next, you need to download the source files onto your working machine. This is called “to clone a repository” because it creates a local copy of the repository on your working machine.
First, download the source files to your working machine, i.e. clone the repository:
Run in your terminal:
```sh
git clone git@github.com:your_github_username/ClickHouse.git # replace placeholder with your GitHub user name
cd ClickHouse
```
git clone git@github.com:your_github_username/ClickHouse.git # replace placeholder with your GitHub user name
cd ClickHouse
This command creates a directory `ClickHouse/` containing the source code of ClickHouse. If you specify a custom checkout directory after the URL but it is important that this path does not contain whitespaces as it may lead to problems with the build later on.
This command will create a directory `ClickHouse/` containing the source code of ClickHouse. If you specify a custom checkout directory (after the URL), it is important that this path does not contain whitespaces as it may lead to problems with the build system.
To make library dependencies available for the build, the ClickHouse repository uses Git submodules, i.e. references to external repositories. These are not checked out by default. To do so, you can either
The ClickHouse repository uses Git submodules, i.e. references to external repositories (usually 3rd party libraries used by ClickHouse). These are not checked out by default. To do so, you can either
- run `git clone` with option `--recurse-submodules`,
@ -52,7 +49,7 @@ To make library dependencies available for the build, the ClickHouse repository
You can check the Git status with the command: `git submodule status`.
If you get the following error message:
If you get the following error message
Permission denied (publickey).
fatal: Could not read from remote repository.
@ -60,7 +57,7 @@ If you get the following error message:
Please make sure you have the correct access rights
and the repository exists.
It generally means that the SSH keys for connecting to GitHub are missing. These keys are normally located in `~/.ssh`. For SSH keys to be accepted you need to upload them in the settings section of GitHub UI.
it generally means that the SSH keys for connecting to GitHub are missing. These keys are normally located in `~/.ssh`. For SSH keys to be accepted you need to upload them in GitHub's settings.
You can also clone the repository via https protocol:
@ -74,12 +71,17 @@ You can also add original ClickHouse repo address to your local repository to pu
After successfully running this command you will be able to pull updates from the main ClickHouse repo by running `git pull upstream master`.
:::note
Instructions below assume you are building on Linux. If you are cross-compiling or using building on macOS, please also check for operating system and architecture specific guides, such as building [on macOS for macOS](build-osx.md), [on Linux for macOS](build-cross-osx.md), [on Linux for Linux/RISC-V](build-cross-riscv.md) and so on.
:::
## Build System {#build-system}
ClickHouse uses CMake and Ninja for building.
CMake - a meta-build system that can generate Ninja files (build tasks).
Ninja - a smaller build system with a focus on the speed used to execute those cmake generated tasks.
- CMake - a meta-build system that can generate Ninja files (build tasks).
- Ninja - a smaller build system with a focus on the speed used to execute those cmake generated tasks.
To install on Ubuntu, Debian or Mint run `sudo apt install cmake ninja-build`.

View File

@ -2647,7 +2647,7 @@ Default value: 0.
## input_format_parallel_parsing {#input-format-parallel-parsing}
Enables or disables order-preserving parallel parsing of data formats. Supported only for [TSV](../../interfaces/formats.md/#tabseparated), [TKSV](../../interfaces/formats.md/#tskv), [CSV](../../interfaces/formats.md/#csv) and [JSONEachRow](../../interfaces/formats.md/#jsoneachrow) formats.
Enables or disables order-preserving parallel parsing of data formats. Supported only for [TSV](../../interfaces/formats.md/#tabseparated), [TSKV](../../interfaces/formats.md/#tskv), [CSV](../../interfaces/formats.md/#csv) and [JSONEachRow](../../interfaces/formats.md/#jsoneachrow) formats.
Possible values:
@ -2658,7 +2658,7 @@ Default value: `1`.
## output_format_parallel_formatting {#output-format-parallel-formatting}
Enables or disables parallel formatting of data formats. Supported only for [TSV](../../interfaces/formats.md/#tabseparated), [TKSV](../../interfaces/formats.md/#tskv), [CSV](../../interfaces/formats.md/#csv) and [JSONEachRow](../../interfaces/formats.md/#jsoneachrow) formats.
Enables or disables parallel formatting of data formats. Supported only for [TSV](../../interfaces/formats.md/#tabseparated), [TSKV](../../interfaces/formats.md/#tskv), [CSV](../../interfaces/formats.md/#csv) and [JSONEachRow](../../interfaces/formats.md/#jsoneachrow) formats.
Possible values:

View File

@ -216,7 +216,6 @@ Arguments:
- `--logger.level` — Log level.
- `--ignore-error` — do not stop processing if a query failed.
- `-c`, `--config-file` — path to configuration file in same format as for ClickHouse server, by default the configuration empty.
- `--no-system-tables` — do not attach system tables.
- `--help` — arguments references for `clickhouse-local`.
- `-V`, `--version` — print version information and exit.

View File

@ -319,9 +319,9 @@ This is a relatively fast non-cryptographic hash function of average quality for
Calculates a 64-bit hash code from any type of integer.
It works faster than intHash32. Average quality.
## SHA1, SHA224, SHA256, SHA512
## SHA1, SHA224, SHA256, SHA512, SHA512_256
Calculates SHA-1, SHA-224, SHA-256, SHA-512 hash from a string and returns the resulting set of bytes as [FixedString](/docs/en/sql-reference/data-types/fixedstring.md).
Calculates SHA-1, SHA-224, SHA-256, SHA-512, SHA-512-256 hash from a string and returns the resulting set of bytes as [FixedString](/docs/en/sql-reference/data-types/fixedstring.md).
**Syntax**

View File

@ -628,6 +628,8 @@ SELECT
formatReadableSize(filesize_bytes) AS filesize
```
Alias: `FORMAT_BYTES`.
``` text
┌─filesize_bytes─┬─filesize───┐
│ 1 │ 1.00 B │

View File

@ -393,40 +393,6 @@ Reverses the sequence of bytes in a string.
Reverses a sequence of Unicode code points in a string. Assumes that the string contains valid UTF-8 encoded text. If this assumption is violated, no exception is thrown and the result is undefined.
## format
Format the `pattern` string with the strings listed in the arguments, similar to formatting in Python. The pattern string can contain replacement fields surrounded by curly braces `{}`. Anything not contained in braces is considered literal text and copied verbatim into the output. Literal brace character can be escaped by two braces: `{{ '{{' }}` and `{{ '}}' }}`. Field names can be numbers (starting from zero) or empty (then they are implicitly given monotonically increasing numbers).
**Syntax**
```sql
format(pattern, s0, s1, …)
```
**Example**
``` sql
SELECT format('{1} {0} {1}', 'World', 'Hello')
```
```result
┌─format('{1} {0} {1}', 'World', 'Hello')─┐
│ Hello World Hello │
└─────────────────────────────────────────┘
```
With implicit numbers:
``` sql
SELECT format('{} {}', 'Hello', 'World')
```
```result
┌─format('{} {}', 'Hello', 'World')─┐
│ Hello World │
└───────────────────────────────────┘
```
## concat
Concatenates the given arguments.

View File

@ -132,6 +132,40 @@ For more information, see [RE2](https://github.com/google/re2/blob/master/re2/re
regexpQuoteMeta(s)
```
## format
Format the `pattern` string with the values (strings, integers, etc.) listed in the arguments, similar to formatting in Python. The pattern string can contain replacement fields surrounded by curly braces `{}`. Anything not contained in braces is considered literal text and copied verbatim into the output. Literal brace character can be escaped by two braces: `{{ '{{' }}` and `{{ '}}' }}`. Field names can be numbers (starting from zero) or empty (then they are implicitly given monotonically increasing numbers).
**Syntax**
```sql
format(pattern, s0, s1, …)
```
**Example**
``` sql
SELECT format('{1} {0} {1}', 'World', 'Hello')
```
```result
┌─format('{1} {0} {1}', 'World', 'Hello')─┐
│ Hello World Hello │
└─────────────────────────────────────────┘
```
With implicit numbers:
``` sql
SELECT format('{} {}', 'Hello', 'World')
```
```result
┌─format('{} {}', 'Hello', 'World')─┐
│ Hello World │
└───────────────────────────────────┘
```
## translate
Replaces characters in the string `s` using a one-to-one character mapping defined by `from` and `to` strings. `from` and `to` must be constant ASCII strings of the same size. Non-ASCII characters in the original string are not modified.

View File

@ -90,152 +90,11 @@ Views look the same as normal tables. For example, they are listed in the result
To delete a view, use [DROP VIEW](../../../sql-reference/statements/drop.md#drop-view). Although `DROP TABLE` works for VIEWs as well.
## Live View [Experimental]
## Live View [Deprecated]
:::note
This is an experimental feature that may change in backwards-incompatible ways in the future releases. Enable usage of live views and `WATCH` query using [allow_experimental_live_view](../../../operations/settings/settings.md#allow-experimental-live-view) setting. Input the command `set allow_experimental_live_view = 1`.
:::
This feature is deprecated and will be removed in the future.
```sql
CREATE LIVE VIEW [IF NOT EXISTS] [db.]table_name [WITH REFRESH [value_in_sec]] AS SELECT ...
```
Live views store result of the corresponding [SELECT](../../../sql-reference/statements/select/index.md) query and are updated any time the result of the query changes. Query result as well as partial result needed to combine with new data are stored in memory providing increased performance for repeated queries. Live views can provide push notifications when query result changes using the [WATCH](../../../sql-reference/statements/watch.md) query.
Live views are triggered by insert into the innermost table specified in the query.
Live views work similarly to how a query in a distributed table works. But instead of combining partial results from different servers they combine partial result from current data with partial result from the new data. When a live view query includes a subquery then the cached partial result is only stored for the innermost subquery.
:::info
- [Table function](../../../sql-reference/table-functions/index.md) is not supported as the innermost table.
- Tables that do not have inserts such as a [dictionary](../../../sql-reference/dictionaries/index.md), [system table](../../../operations/system-tables/index.md), a [normal view](#normal), or a [materialized view](#materialized) will not trigger a live view.
- Only queries where one can combine partial result from the old data plus partial result from the new data will work. Live view will not work for queries that require the complete data set to compute the final result or aggregations where the state of the aggregation must be preserved.
- Does not work with replicated or distributed tables where inserts are performed on different nodes.
- Can't be triggered by multiple tables.
See [WITH REFRESH](#live-view-with-refresh) to force periodic updates of a live view that in some cases can be used as a workaround.
:::
### Monitoring Live View Changes
You can monitor changes in the `LIVE VIEW` query result using [WATCH](../../../sql-reference/statements/watch.md) query.
```sql
WATCH [db.]live_view
```
**Example:**
```sql
CREATE TABLE mt (x Int8) Engine = MergeTree ORDER BY x;
CREATE LIVE VIEW lv AS SELECT sum(x) FROM mt;
```
Watch a live view while doing a parallel insert into the source table.
```sql
WATCH lv;
```
```bash
┌─sum(x)─┬─_version─┐
│ 1 │ 1 │
└────────┴──────────┘
┌─sum(x)─┬─_version─┐
│ 3 │ 2 │
└────────┴──────────┘
┌─sum(x)─┬─_version─┐
│ 6 │ 3 │
└────────┴──────────┘
```
```sql
INSERT INTO mt VALUES (1);
INSERT INTO mt VALUES (2);
INSERT INTO mt VALUES (3);
```
Or add [EVENTS](../../../sql-reference/statements/watch.md#events-clause) clause to just get change events.
```sql
WATCH [db.]live_view EVENTS;
```
**Example:**
```sql
WATCH lv EVENTS;
```
```bash
┌─version─┐
│ 1 │
└─────────┘
┌─version─┐
│ 2 │
└─────────┘
┌─version─┐
│ 3 │
└─────────┘
```
You can execute [SELECT](../../../sql-reference/statements/select/index.md) query on a live view in the same way as for any regular view or a table. If the query result is cached it will return the result immediately without running the stored query on the underlying tables.
```sql
SELECT * FROM [db.]live_view WHERE ...
```
### Force Live View Refresh
You can force live view refresh using the `ALTER LIVE VIEW [db.]table_name REFRESH` statement.
### WITH REFRESH Clause
When a live view is created with a `WITH REFRESH` clause then it will be automatically refreshed after the specified number of seconds elapse since the last refresh or trigger.
```sql
CREATE LIVE VIEW [db.]table_name WITH REFRESH [value_in_sec] AS SELECT ...
```
If the refresh value is not specified then the value specified by the [periodic_live_view_refresh](../../../operations/settings/settings.md#periodic-live-view-refresh) setting is used.
**Example:**
```sql
CREATE LIVE VIEW lv WITH REFRESH 5 AS SELECT now();
WATCH lv
```
```bash
┌───────────────now()─┬─_version─┐
│ 2021-02-21 08:47:05 │ 1 │
└─────────────────────┴──────────┘
┌───────────────now()─┬─_version─┐
│ 2021-02-21 08:47:10 │ 2 │
└─────────────────────┴──────────┘
┌───────────────now()─┬─_version─┐
│ 2021-02-21 08:47:15 │ 3 │
└─────────────────────┴──────────┘
```
```sql
WATCH lv
```
```
Code: 60. DB::Exception: Received from localhost:9000. DB::Exception: Table default.lv does not exist..
```
### Live View Usage
Most common uses of live view tables include:
- Providing push notifications for query result changes to avoid polling.
- Caching results of most frequent queries to provide immediate query results.
- Watching for table changes and triggering a follow-up select queries.
- Watching metrics from system tables using periodic refresh.
**See Also**
- [ALTER LIVE VIEW](../alter/view.md#alter-live-view)
For your convenience, the old documentation is located [here](https://pastila.nl/?00f32652/fdf07272a7b54bda7e13b919264e449f.md)
## Window View [Experimental]

View File

@ -2249,7 +2249,7 @@ SELECT * FROM test_table
## input_format_parallel_parsing {#input-format-parallel-parsing}
Включает или отключает режим, при котором входящие данные разбиваются на части, парсинг каждой из которых осуществляется параллельно с сохранением исходного порядка. Поддерживается только для форматов [TSV](../../interfaces/formats.md#tabseparated), [TKSV](../../interfaces/formats.md#tskv), [CSV](../../interfaces/formats.md#csv) и [JSONEachRow](../../interfaces/formats.md#jsoneachrow).
Включает или отключает режим, при котором входящие данные разбиваются на части, парсинг каждой из которых осуществляется параллельно с сохранением исходного порядка. Поддерживается только для форматов [TSV](../../interfaces/formats.md#tabseparated), [TSKV](../../interfaces/formats.md#tskv), [CSV](../../interfaces/formats.md#csv) и [JSONEachRow](../../interfaces/formats.md#jsoneachrow).
Возможные значения:
@ -2260,7 +2260,7 @@ SELECT * FROM test_table
## output_format_parallel_formatting {#output-format-parallel-formatting}
Включает или отключает режим, при котором исходящие данные форматируются параллельно с сохранением исходного порядка. Поддерживается только для форматов [TSV](../../interfaces/formats.md#tabseparated), [TKSV](../../interfaces/formats.md#tskv), [CSV](../../interfaces/formats.md#csv) и [JSONEachRow](../../interfaces/formats.md#jsoneachrow).
Включает или отключает режим, при котором исходящие данные форматируются параллельно с сохранением исходного порядка. Поддерживается только для форматов [TSV](../../interfaces/formats.md#tabseparated), [TSKV](../../interfaces/formats.md#tskv), [CSV](../../interfaces/formats.md#csv) и [JSONEachRow](../../interfaces/formats.md#jsoneachrow).
Возможные значения:

View File

@ -45,7 +45,6 @@ $ clickhouse-local --structure "table_structure" --input-format "format_of_incom
- `--logger.level` — уровень логирования.
- `--ignore-error` — не прекращать обработку если запрос выдал ошибку.
- `-c`, `--config-file` — путь к файлу конфигурации. По умолчанию `clickhouse-local` запускается с пустой конфигурацией. Конфигурационный файл имеет тот же формат, что и для сервера ClickHouse, и в нём можно использовать все конфигурационные параметры сервера. Обычно подключение конфигурации не требуется; если требуется установить отдельный параметр, то это можно сделать ключом с именем параметра.
- `--no-system-tables` — запуск без использования системных таблиц.
- `--help` — вывод справочной информации о `clickhouse-local`.
- `-V`, `--version` — вывод текущей версии и выход.

View File

@ -1203,7 +1203,7 @@ ClickHouse生成异常
- 类型:布尔
- 默认值True
启用数据格式的保序并行分析。 仅支持TSVTKSVCSV和JSONEachRow格式。
启用数据格式的保序并行分析。 仅支持TSVTSKVCSV和JSONEachRow格式。
## min_chunk_bytes_for_parallel_parsing {#min-chunk-bytes-for-parallel-parsing}

View File

@ -45,7 +45,6 @@ clickhouse-local --structure "table_structure" --input-format "format_of_incomin
- `--logger.level` — 日志级别。
- `--ignore-error` — 当查询失败时,不停止处理。
- `-c`, `--config-file` — 与ClickHouse服务器格式相同配置文件的路径默认情况下配置为空。
- `--no-system-tables` — 不附加系统表。
- `--help``clickhouse-local`使用帮助信息。
- `-V`, `--version` — 打印版本信息并退出。

View File

@ -328,6 +328,13 @@ try
config().getUInt("max_thread_pool_free_size", 1000),
config().getUInt("thread_pool_queue_size", 10000)
);
/// Wait for all threads to avoid possible use-after-free (for example logging objects can be already destroyed).
SCOPE_EXIT({
Stopwatch watch;
LOG_INFO(log, "Waiting for background threads");
GlobalThreadPool::instance().shutdown();
LOG_INFO(log, "Background threads finished in {} ms", watch.elapsedMilliseconds());
});
static ServerErrorHandler error_handler;
Poco::ErrorHandler::set(&error_handler);

View File

@ -744,7 +744,7 @@ void LocalServer::processConfig()
LOG_DEBUG(log, "Loading metadata from {}", path);
auto startup_system_tasks = loadMetadataSystem(global_context);
attachSystemTablesLocal(global_context, *createMemoryDatabaseIfNotExists(global_context, DatabaseCatalog::SYSTEM_DATABASE));
attachSystemTablesLocal</* lazy= */ true>(global_context, *createMemoryDatabaseIfNotExists(global_context, DatabaseCatalog::SYSTEM_DATABASE));
attachInformationSchema(global_context, *createMemoryDatabaseIfNotExists(global_context, DatabaseCatalog::INFORMATION_SCHEMA));
attachInformationSchema(global_context, *createMemoryDatabaseIfNotExists(global_context, DatabaseCatalog::INFORMATION_SCHEMA_UPPERCASE));
waitLoad(TablesLoaderForegroundPoolId, startup_system_tasks);
@ -761,9 +761,9 @@ void LocalServer::processConfig()
LOG_DEBUG(log, "Loaded metadata.");
}
else if (!config().has("no-system-tables"))
else
{
attachSystemTablesLocal(global_context, *createMemoryDatabaseIfNotExists(global_context, DatabaseCatalog::SYSTEM_DATABASE));
attachSystemTablesLocal</* lazy= */ true>(global_context, *createMemoryDatabaseIfNotExists(global_context, DatabaseCatalog::SYSTEM_DATABASE));
attachInformationSchema(global_context, *createMemoryDatabaseIfNotExists(global_context, DatabaseCatalog::INFORMATION_SCHEMA));
attachInformationSchema(global_context, *createMemoryDatabaseIfNotExists(global_context, DatabaseCatalog::INFORMATION_SCHEMA_UPPERCASE));
}
@ -842,7 +842,6 @@ void LocalServer::addOptions(OptionsDescription & options_description)
("logger.log", po::value<std::string>(), "Log file name")
("logger.level", po::value<std::string>(), "Log level")
("no-system-tables", "do not attach system tables (better startup time)")
("path", po::value<std::string>(), "Storage path")
("only-system-tables", "attach only system tables from specified path")
("top_level_domains_path", po::value<std::string>(), "Path to lists with custom TLDs")
@ -871,8 +870,6 @@ void LocalServer::processOptions(const OptionsDescription &, const CommandLineOp
config().setString("table-file", options["file"].as<std::string>());
if (options.count("structure"))
config().setString("table-structure", options["structure"].as<std::string>());
if (options.count("no-system-tables"))
config().setBool("no-system-tables", true);
if (options.count("only-system-tables"))
config().setBool("only-system-tables", true);
if (options.count("database"))

View File

@ -657,6 +657,11 @@ try
CurrentMetrics::set(CurrentMetrics::Revision, ClickHouseRevision::getVersionRevision());
CurrentMetrics::set(CurrentMetrics::VersionInteger, ClickHouseRevision::getVersionInteger());
Poco::ThreadPool server_pool(3, server_settings.max_connections);
std::mutex servers_lock;
std::vector<ProtocolServerAdapter> servers;
std::vector<ProtocolServerAdapter> servers_to_start_before_tables;
/** Context contains all that query execution is dependent:
* settings, available functions, data types, aggregate functions, databases, ...
*/
@ -697,6 +702,68 @@ try
server_settings.max_thread_pool_size,
server_settings.max_thread_pool_free_size,
server_settings.thread_pool_queue_size);
/// Wait for all threads to avoid possible use-after-free (for example logging objects can be already destroyed).
SCOPE_EXIT({
Stopwatch watch;
LOG_INFO(log, "Waiting for background threads");
GlobalThreadPool::instance().shutdown();
LOG_INFO(log, "Background threads finished in {} ms", watch.elapsedMilliseconds());
});
/// NOTE: global context should be destroyed *before* GlobalThreadPool::shutdown()
/// Otherwise GlobalThreadPool::shutdown() will hang, since Context holds some threads.
SCOPE_EXIT({
/** Ask to cancel background jobs all table engines,
* and also query_log.
* It is important to do early, not in destructor of Context, because
* table engines could use Context on destroy.
*/
LOG_INFO(log, "Shutting down storages.");
global_context->shutdown();
LOG_DEBUG(log, "Shut down storages.");
if (!servers_to_start_before_tables.empty())
{
LOG_DEBUG(log, "Waiting for current connections to servers for tables to finish.");
size_t current_connections = 0;
{
std::lock_guard lock(servers_lock);
for (auto & server : servers_to_start_before_tables)
{
server.stop();
current_connections += server.currentConnections();
}
}
if (current_connections)
LOG_INFO(log, "Closed all listening sockets. Waiting for {} outstanding connections.", current_connections);
else
LOG_INFO(log, "Closed all listening sockets.");
if (current_connections > 0)
current_connections = waitServersToFinish(servers_to_start_before_tables, servers_lock, server_settings.shutdown_wait_unfinished);
if (current_connections)
LOG_INFO(log, "Closed connections to servers for tables. But {} remain. Probably some tables of other users cannot finish their connections after context shutdown.", current_connections);
else
LOG_INFO(log, "Closed connections to servers for tables.");
}
global_context->shutdownKeeperDispatcher();
/// Wait server pool to avoid use-after-free of destroyed context in the handlers
server_pool.joinAll();
/** Explicitly destroy Context. It is more convenient than in destructor of Server, because logger is still available.
* At this moment, no one could own shared part of Context.
*/
global_context.reset();
shared_context.reset();
LOG_DEBUG(log, "Destroyed global context.");
});
#if USE_AZURE_BLOB_STORAGE
/// It makes sense to deinitialize libxml after joining of all threads
@ -755,10 +822,6 @@ try
}
}
Poco::ThreadPool server_pool(3, server_settings.max_connections);
std::mutex servers_lock;
std::vector<ProtocolServerAdapter> servers;
std::vector<ProtocolServerAdapter> servers_to_start_before_tables;
/// This object will periodically calculate some metrics.
ServerAsynchronousMetrics async_metrics(
global_context,
@ -1598,60 +1661,6 @@ try
/// try set up encryption. There are some errors in config, error will be printed and server wouldn't start.
CompressionCodecEncrypted::Configuration::instance().load(config(), "encryption_codecs");
SCOPE_EXIT({
async_metrics.stop();
/** Ask to cancel background jobs all table engines,
* and also query_log.
* It is important to do early, not in destructor of Context, because
* table engines could use Context on destroy.
*/
LOG_INFO(log, "Shutting down storages.");
global_context->shutdown();
LOG_DEBUG(log, "Shut down storages.");
if (!servers_to_start_before_tables.empty())
{
LOG_DEBUG(log, "Waiting for current connections to servers for tables to finish.");
size_t current_connections = 0;
{
std::lock_guard lock(servers_lock);
for (auto & server : servers_to_start_before_tables)
{
server.stop();
current_connections += server.currentConnections();
}
}
if (current_connections)
LOG_INFO(log, "Closed all listening sockets. Waiting for {} outstanding connections.", current_connections);
else
LOG_INFO(log, "Closed all listening sockets.");
if (current_connections > 0)
current_connections = waitServersToFinish(servers_to_start_before_tables, servers_lock, server_settings.shutdown_wait_unfinished);
if (current_connections)
LOG_INFO(log, "Closed connections to servers for tables. But {} remain. Probably some tables of other users cannot finish their connections after context shutdown.", current_connections);
else
LOG_INFO(log, "Closed connections to servers for tables.");
global_context->shutdownKeeperDispatcher();
}
/// Wait server pool to avoid use-after-free of destroyed context in the handlers
server_pool.joinAll();
/** Explicitly destroy Context. It is more convenient than in destructor of Server, because logger is still available.
* At this moment, no one could own shared part of Context.
*/
global_context.reset();
shared_context.reset();
LOG_DEBUG(log, "Destroyed global context.");
});
/// DNSCacheUpdater uses BackgroundSchedulePool which lives in shared context
/// and thus this object must be created after the SCOPE_EXIT object where shared
/// context is destroyed.

View File

@ -110,7 +110,7 @@ public:
}
}
}
else
else if (row_begin < row_end)
{
size_t pos = First ? row_begin : row_end - 1;
add(place, columns, pos, arena);

View File

@ -549,8 +549,10 @@ public:
auto to = std::lower_bound(offsets.begin(), offsets.end(), row_end) - offsets.begin() + 1;
size_t num_defaults = (row_end - row_begin) - (to - from);
static_cast<const Derived *>(this)->addBatchSinglePlace(from, to, place, &values, arena, -1);
static_cast<const Derived *>(this)->addManyDefaults(place, &values, num_defaults, arena);
if (from < to)
static_cast<const Derived *>(this)->addBatchSinglePlace(from, to, place, &values, arena, -1);
if (num_defaults > 0)
static_cast<const Derived *>(this)->addManyDefaults(place, &values, num_defaults, arena);
}
void addBatchSinglePlaceNotNull( /// NOLINT

View File

@ -33,8 +33,17 @@ public:
if (function_node->getArguments().getNodes().size() != 3)
return;
auto result_type = function_node->getResultType();
function_node->resolveAsFunction(if_function_ptr->build(function_node->getArgumentColumns()));
auto if_function_value = if_function_ptr->build(function_node->getArgumentColumns());
if (!if_function_value->getResultType()->equals(*function_node->getResultType()))
{
/** We faced some corner case, when result type of `if` and `multiIf` are different.
* For example, currently `if(NULL`, a, b)` returns type of `a` column,
* but multiIf(NULL, a, b) returns supertypetype of `a` and `b`.
*/
return;
}
function_node->resolveAsFunction(std::move(if_function_value));
}
private:

View File

@ -7,7 +7,7 @@
#include <cassert>
#include <cstring>
#include <unistd.h>
#include <sys/select.h>
#include <poll.h>
#include <sys/time.h>
#include <sys/types.h>
@ -27,11 +27,8 @@ void trim(String & s)
/// Allows delaying the start of query execution until the entirety of query is inserted.
bool hasInputData()
{
timeval timeout = {0, 0};
fd_set fds{};
FD_ZERO(&fds);
FD_SET(STDIN_FILENO, &fds);
return select(1, &fds, nullptr, nullptr, &timeout) == 1;
pollfd fd{STDIN_FILENO, POLLIN, 0};
return poll(&fd, 1, 0) == 1;
}
struct NoCaseCompare

View File

@ -6,6 +6,7 @@
#include <IO/copyData.h>
#include <algorithm>
#include <cstdlib>
#include <stdexcept>
#include <chrono>
#include <cerrno>
@ -21,6 +22,7 @@
#include <fstream>
#include <filesystem>
#include <fmt/format.h>
#include <Common/quoteString.h>
#include "config.h" // USE_SKIM
#if USE_SKIM
@ -94,7 +96,14 @@ int executeCommand(char * const argv[])
throw std::runtime_error(fmt::format("Cannot waitpid {}: {}", pid, errnoToString()));
} while (true);
return status;
if (WIFEXITED(status))
return WEXITSTATUS(status);
if (WIFSIGNALED(status))
throw std::runtime_error(fmt::format("Child process was terminated by signal {}", WTERMSIG(status)));
if (WIFSTOPPED(status))
throw std::runtime_error(fmt::format("Child process was stopped by signal {}", WSTOPSIG(status)));
throw std::runtime_error("Child process was not exited normally by unknown reason");
}
void writeRetry(int fd, const std::string & data)
@ -504,22 +513,29 @@ void ReplxxLineReader::addToHistory(const String & line)
void ReplxxLineReader::openEditor()
{
TemporaryFile editor_file("clickhouse_client_editor_XXXXXX.sql");
editor_file.write(rx.get_state().text());
editor_file.close();
char * const argv[] = {editor.data(), editor_file.getPath().data(), nullptr};
try
{
if (executeCommand(argv) == 0)
TemporaryFile editor_file("clickhouse_client_editor_XXXXXX.sql");
editor_file.write(rx.get_state().text());
editor_file.close();
char * const argv[] = {editor.data(), editor_file.getPath().data(), nullptr};
int editor_exit_code = executeCommand(argv);
if (editor_exit_code == EXIT_SUCCESS)
{
const std::string & new_query = readFile(editor_file.getPath());
rx.set_state(replxx::Replxx::State(new_query.c_str(), static_cast<int>(new_query.size())));
}
else
{
rx.print(fmt::format("Editor {} terminated unsuccessfully: {}\n", backQuoteIfNeed(editor), editor_exit_code).data());
}
}
catch (const std::runtime_error & e)
{
rx.print(e.what());
rx.print("\n");
}
if (bracketed_paste_enabled)

View File

@ -238,6 +238,7 @@
M(DictCacheLockReadNs, "Number of nanoseconds spend in waiting for read lock to lookup the data for the dictionaries of 'cache' types.") \
\
M(DistributedSyncInsertionTimeoutExceeded, "A timeout has exceeded while waiting for shards during synchronous insertion into a Distributed table (with 'distributed_foreground_insert' = 1)") \
M(DistributedAsyncInsertionFailures, "Number of failures for asynchronous insertion into a Distributed table (with 'distributed_foreground_insert' = 0)") \
M(DataAfterMergeDiffersFromReplica, R"(
Number of times data after merge is not byte-identical to the data on another replicas. There could be several reasons:
1. Using newer version of compression library after server update.

View File

@ -3,6 +3,8 @@
#include <Common/Exception.h>
#include <Common/CurrentMetrics.h>
#include <Common/ProfileEvents.h>
#include <IO/Operators.h>
#include <IO/WriteBufferFromString.h>
namespace ProfileEvents
@ -155,25 +157,34 @@ RWLockImpl::getLock(RWLockImpl::Type type, const String & query_id, const std::c
if (type == Type::Write)
{
/// Always add a group for a writer (writes are never performed simultaneously).
writers_queue.emplace_back(type); /// SM1: may throw (nothing to roll back)
}
else if (readers_queue.empty() ||
(rdlock_owner == readers_queue.begin() && readers_queue.size() == 1 && !writers_queue.empty()))
else
{
readers_queue.emplace_back(type); /// SM1: may throw (nothing to roll back)
/// We don't always add a group to readers_queue here because multiple readers can use the same group.
/// We can reuse the last group if the last group didn't get ownership yet,
/// or even if it got ownership but there are no writers waiting in writers_queue.
bool can_use_last_group = !readers_queue.empty() && (!readers_queue.back().ownership || writers_queue.empty());
if (!can_use_last_group)
readers_queue.emplace_back(type); /// SM1: may throw (nothing to roll back)
}
GroupsContainer::iterator it_group =
(type == Type::Write) ? std::prev(writers_queue.end()) : std::prev(readers_queue.end());
/// Lock is free to acquire
if (rdlock_owner == readers_queue.end() && wrlock_owner == writers_queue.end())
{
/// Set `rdlock_owner` or `wrlock_owner` and make it owner.
(type == Read ? rdlock_owner : wrlock_owner) = it_group; /// SM2: nothrow
grantOwnership(it_group);
}
else
{
/// Wait until our group becomes the lock owner
const auto predicate = [&] () { return it_group == (type == Read ? rdlock_owner : wrlock_owner); };
const auto predicate = [&] () { return it_group->ownership; };
if (lock_deadline_tp == std::chrono::time_point<std::chrono::steady_clock>::max())
{
@ -193,15 +204,20 @@ RWLockImpl::getLock(RWLockImpl::Type type, const String & query_id, const std::c
/// Rollback(SM1): nothrow
if (it_group->requests == 0)
{
/// When WRITE lock fails, we need to notify next read that is waiting,
/// to avoid handing request, hence next=true.
dropOwnerGroupAndPassOwnership(it_group, /* next= */ true);
((type == Read) ? readers_queue : writers_queue).erase(it_group);
}
/// While we were waiting for this write lock (which has just failed) more readers could start waiting,
/// we need to wake up them now.
if ((rdlock_owner != readers_queue.end()) && writers_queue.empty())
grantOwnershipToAllReaders();
return nullptr;
}
}
}
/// Our group must be an owner here.
chassert(it_group->ownership);
if (request_has_query_id)
{
try
@ -216,7 +232,7 @@ RWLockImpl::getLock(RWLockImpl::Type type, const String & query_id, const std::c
/// Methods std::list<>::emplace_back() and std::unordered_map<>::emplace() provide strong exception safety
/// We only need to roll back the changes to these objects: owner_queries and the readers/writers queue
if (it_group->requests == 0)
dropOwnerGroupAndPassOwnership(it_group, /* next= */ false); /// Rollback(SM1): nothrow
dropOwnerGroupAndPassOwnership(it_group); /// Rollback(SM1): nothrow
throw;
}
@ -237,19 +253,28 @@ RWLockImpl::getLock(RWLockImpl::Type type, const String & query_id, const std::c
* it is guaranteed that all three steps have been executed successfully and the resulting state is consistent.
* With the mutex locked the order of steps to restore the lock's state can be arbitrary
*
* We do not employ try-catch: if something bad happens, there is nothing we can do =(
* We do not employ try-catch: if something bad happens and chassert() is disabled, there is nothing we can do
* (we can't throw an exception here because RWLockImpl::unlock() is called from the destructor ~LockHolderImpl).
*/
void RWLockImpl::unlock(GroupsContainer::iterator group_it, const String & query_id) noexcept
{
std::lock_guard state_lock(internal_state_mtx);
/// All of these are Undefined behavior and nothing we can do!
if (rdlock_owner == readers_queue.end() && wrlock_owner == writers_queue.end())
/// Our group must be an owner here.
if (!group_it->ownership)
{
chassert(false && "RWLockImpl::unlock() is called for a non-owner group");
return;
if (rdlock_owner != readers_queue.end() && group_it != rdlock_owner)
return;
if (wrlock_owner != writers_queue.end() && group_it != wrlock_owner)
}
/// Check consistency.
if ((group_it->type == Read)
? !(rdlock_owner != readers_queue.end() && wrlock_owner == writers_queue.end())
: !(wrlock_owner != writers_queue.end() && rdlock_owner == readers_queue.end() && group_it == wrlock_owner))
{
chassert(false && "RWLockImpl::unlock() found the rwlock inconsistent");
return;
}
/// If query_id is not empty it must be listed in parent->owner_queries
if (query_id != NO_QUERY)
@ -264,12 +289,26 @@ void RWLockImpl::unlock(GroupsContainer::iterator group_it, const String & query
/// If we are the last remaining referrer, remove this QNode and notify the next one
if (--group_it->requests == 0) /// SM: nothrow
dropOwnerGroupAndPassOwnership(group_it, /* next= */ false);
dropOwnerGroupAndPassOwnership(group_it);
}
void RWLockImpl::dropOwnerGroupAndPassOwnership(GroupsContainer::iterator group_it, bool next) noexcept
void RWLockImpl::dropOwnerGroupAndPassOwnership(GroupsContainer::iterator group_it) noexcept
{
/// All readers with ownership must finish before switching to write phase.
/// Such readers has iterators from `readers_queue.begin()` to `rdlock_owner`, so if `rdlock_owner` is equal to `readers_queue.begin()`
/// that means there is only one reader with ownership left in the readers_queue and we can proceed to generic procedure.
if ((group_it->type == Read) && (rdlock_owner != readers_queue.begin()) && (rdlock_owner != readers_queue.end()))
{
if (rdlock_owner == group_it)
--rdlock_owner;
readers_queue.erase(group_it);
/// If there are no writers waiting in writers_queue then we can wake up other readers.
if (writers_queue.empty())
grantOwnershipToAllReaders();
return;
}
rdlock_owner = readers_queue.end();
wrlock_owner = writers_queue.end();
@ -278,42 +317,86 @@ void RWLockImpl::dropOwnerGroupAndPassOwnership(GroupsContainer::iterator group_
readers_queue.erase(group_it);
/// Prepare next phase
if (!writers_queue.empty())
{
wrlock_owner = writers_queue.begin();
}
else
{
rdlock_owner = readers_queue.begin();
}
}
else
{
writers_queue.erase(group_it);
/// Prepare next phase
if (!readers_queue.empty())
{
if (next && readers_queue.size() > 1)
{
rdlock_owner = std::next(readers_queue.begin());
}
else
{
rdlock_owner = readers_queue.begin();
}
}
rdlock_owner = readers_queue.begin();
else
{
wrlock_owner = writers_queue.begin();
}
}
if (rdlock_owner != readers_queue.end())
{
rdlock_owner->cv.notify_all();
grantOwnershipToAllReaders();
}
else if (wrlock_owner != writers_queue.end())
{
wrlock_owner->cv.notify_one();
grantOwnership(wrlock_owner);
}
}
void RWLockImpl::grantOwnership(GroupsContainer::iterator group_it) noexcept
{
if (!group_it->ownership)
{
group_it->ownership = true;
group_it->cv.notify_all();
}
}
void RWLockImpl::grantOwnershipToAllReaders() noexcept
{
if (rdlock_owner != readers_queue.end())
{
size_t num_new_owners = 0;
for (;;)
{
if (!rdlock_owner->ownership)
++num_new_owners;
grantOwnership(rdlock_owner);
if (std::next(rdlock_owner) == readers_queue.end())
break;
++rdlock_owner;
}
/// There couldn't be more than one reader group which is not an owner.
/// (Because we add a new reader group only if the last reader group is already an owner - see the `can_use_last_group` variable.)
chassert(num_new_owners <= 1);
}
}
std::unordered_map<String, size_t> RWLockImpl::getOwnerQueryIds() const
{
std::lock_guard lock{internal_state_mtx};
return owner_queries;
}
String RWLockImpl::getOwnerQueryIdsDescription() const
{
auto map = getOwnerQueryIds();
WriteBufferFromOwnString out;
bool need_comma = false;
for (const auto & [query_id, num_owners] : map)
{
if (need_comma)
out << ", ";
out << query_id;
if (num_owners != 1)
out << " (" << num_owners << ")";
need_comma = true;
}
return out.str();
}
}

View File

@ -62,35 +62,42 @@ public:
inline static const String NO_QUERY = String();
inline static const auto default_locking_timeout_ms = std::chrono::milliseconds(120000);
/// Returns all query_id owning locks (both read and write) right now.
/// !! This function are for debugging and logging purposes only, DO NOT use them for synchronization!
std::unordered_map<String, size_t> getOwnerQueryIds() const;
String getOwnerQueryIdsDescription() const;
private:
/// Group of locking requests that should be granted simultaneously
/// i.e. one or several readers or a single writer
struct Group
{
const Type type;
size_t requests;
size_t requests = 0;
bool ownership = false; /// whether this group got ownership? (that means `cv` is notified and the locking requests should stop waiting)
std::condition_variable cv; /// all locking requests of the group wait on this condvar
explicit Group(Type type_) : type{type_}, requests{0} {}
explicit Group(Type type_) : type{type_} {}
};
using GroupsContainer = std::list<Group>;
using OwnerQueryIds = std::unordered_map<String, size_t>;
using OwnerQueryIds = std::unordered_map<String /* query_id */, size_t /* num_owners */>;
mutable std::mutex internal_state_mtx;
GroupsContainer readers_queue;
GroupsContainer writers_queue;
GroupsContainer::iterator rdlock_owner{readers_queue.end()}; /// equals to readers_queue.begin() in read phase
/// or readers_queue.end() otherwise
GroupsContainer::iterator rdlock_owner{readers_queue.end()}; /// last group with ownership in readers_queue in read phase
/// or readers_queue.end() in writer phase
GroupsContainer::iterator wrlock_owner{writers_queue.end()}; /// equals to writers_queue.begin() in write phase
/// or writers_queue.end() otherwise
/// or writers_queue.end() in read phase
OwnerQueryIds owner_queries;
RWLockImpl() = default;
void unlock(GroupsContainer::iterator group_it, const String & query_id) noexcept;
/// @param next - notify next after begin, used on writer lock failures
void dropOwnerGroupAndPassOwnership(GroupsContainer::iterator group_it, bool next) noexcept;
void dropOwnerGroupAndPassOwnership(GroupsContainer::iterator group_it) noexcept;
void grantOwnership(GroupsContainer::iterator group_it) noexcept;
void grantOwnershipToAllReaders() noexcept;
};
}

View File

@ -500,3 +500,10 @@ GlobalThreadPool & GlobalThreadPool::instance()
return *the_instance;
}
void GlobalThreadPool::shutdown()
{
if (the_instance)
{
the_instance->finalize();
}
}

View File

@ -109,6 +109,8 @@ public:
void addOnDestroyCallback(OnDestroyCallback && callback);
private:
friend class GlobalThreadPool;
mutable std::mutex mutex;
std::condition_variable job_finished;
std::condition_variable new_job_or_shutdown;
@ -205,6 +207,7 @@ class GlobalThreadPool : public FreeThreadPool, private boost::noncopyable
public:
static void initialize(size_t max_threads = 10000, size_t max_free_threads = 1000, size_t queue_size = 10000);
static GlobalThreadPool & instance();
static void shutdown();
};

View File

@ -144,6 +144,7 @@ const char * errorMessage(Error code)
case Error::ZCLOSING: return "ZooKeeper is closing";
case Error::ZNOTHING: return "(not error) no server responses to process";
case Error::ZSESSIONMOVED: return "Session moved to another server, so operation is ignored";
case Error::ZNOTREADONLY: return "State-changing request is passed to read-only server";
}
UNREACHABLE();
@ -156,7 +157,8 @@ bool isHardwareError(Error zk_return_code)
|| zk_return_code == Error::ZSESSIONMOVED
|| zk_return_code == Error::ZCONNECTIONLOSS
|| zk_return_code == Error::ZMARSHALLINGERROR
|| zk_return_code == Error::ZOPERATIONTIMEOUT;
|| zk_return_code == Error::ZOPERATIONTIMEOUT
|| zk_return_code == Error::ZNOTREADONLY;
}
bool isUserError(Error zk_return_code)
@ -196,4 +198,3 @@ void MultiResponse::removeRootPath(const String & root_path)
}
}

View File

@ -109,7 +109,8 @@ enum class Error : int32_t
ZAUTHFAILED = -115, /// Client authentication failed
ZCLOSING = -116, /// ZooKeeper is closing
ZNOTHING = -117, /// (not error) no server responses to process
ZSESSIONMOVED = -118 /// Session moved to another server, so operation is ignored
ZSESSIONMOVED = -118, /// Session moved to another server, so operation is ignored
ZNOTREADONLY = -119, /// State-changing request is passed to read-only server
};
/// Network errors and similar. You should reinitialize ZooKeeper session in case of these errors
@ -445,6 +446,7 @@ enum State
CONNECTING = 1,
ASSOCIATING = 2,
CONNECTED = 3,
READONLY = 5,
NOTCONNECTED = 999
};

View File

@ -323,6 +323,9 @@ Coordination::Error ZooKeeper::tryCreate(const std::string & path, const std::st
{
Coordination::Error code = createImpl(path, data, mode, path_created);
if (code == Coordination::Error::ZNOTREADONLY && exists(path))
return Coordination::Error::ZNODEEXISTS;
if (!(code == Coordination::Error::ZOK ||
code == Coordination::Error::ZNONODE ||
code == Coordination::Error::ZNODEEXISTS ||
@ -345,6 +348,8 @@ void ZooKeeper::createIfNotExists(const std::string & path, const std::string &
if (code == Coordination::Error::ZOK || code == Coordination::Error::ZNODEEXISTS)
return;
else if (code == Coordination::Error::ZNOTREADONLY && exists(path))
return;
else
throw KeeperException::fromPath(code, path);
}

View File

@ -51,6 +51,7 @@ static constexpr int32_t KEEPER_PROTOCOL_VERSION_CONNECTION_REJECT = 42;
static constexpr int32_t CLIENT_HANDSHAKE_LENGTH = 44;
static constexpr int32_t CLIENT_HANDSHAKE_LENGTH_WITH_READONLY = 45;
static constexpr int32_t SERVER_HANDSHAKE_LENGTH = 36;
static constexpr int32_t SERVER_HANDSHAKE_LENGTH_WITH_READONLY = 37;
static constexpr int32_t PASSWORD_LENGTH = 16;
/// ZooKeeper has 1 MB node size and serialization limit by default,

View File

@ -1,3 +1,4 @@
#include "Common/ZooKeeper/ZooKeeperConstants.h"
#include <Common/ZooKeeper/ZooKeeperImpl.h>
#include <IO/Operators.h>
@ -552,12 +553,13 @@ void ZooKeeper::connect(
void ZooKeeper::sendHandshake()
{
int32_t handshake_length = 44;
int32_t handshake_length = 45;
int64_t last_zxid_seen = 0;
int32_t timeout = args.session_timeout_ms;
int64_t previous_session_id = 0; /// We don't support session restore. So previous session_id is always zero.
constexpr int32_t passwd_len = 16;
std::array<char, passwd_len> passwd {};
bool read_only = true;
write(handshake_length);
if (use_compression)
@ -568,6 +570,7 @@ void ZooKeeper::sendHandshake()
write(timeout);
write(previous_session_id);
write(passwd);
write(read_only);
flushWriteBuffer();
}
@ -577,9 +580,10 @@ void ZooKeeper::receiveHandshake()
int32_t protocol_version_read;
int32_t timeout;
std::array<char, PASSWORD_LENGTH> passwd;
bool read_only;
read(handshake_length);
if (handshake_length != SERVER_HANDSHAKE_LENGTH)
if (handshake_length != SERVER_HANDSHAKE_LENGTH && handshake_length != SERVER_HANDSHAKE_LENGTH_WITH_READONLY)
throw Exception(Error::ZMARSHALLINGERROR, "Unexpected handshake length received: {}", handshake_length);
read(protocol_version_read);
@ -607,6 +611,8 @@ void ZooKeeper::receiveHandshake()
read(session_id);
read(passwd);
if (handshake_length == SERVER_HANDSHAKE_LENGTH_WITH_READONLY)
read(read_only);
}

View File

@ -52,8 +52,7 @@ void Pool::Entry::decrementRefCount()
Pool::Pool(const Poco::Util::AbstractConfiguration & cfg, const std::string & config_name,
unsigned default_connections_, unsigned max_connections_,
const char * parent_config_name_)
: logger(Poco::Logger::get("mysqlxx::Pool"))
, default_connections(default_connections_)
: default_connections(default_connections_)
, max_connections(max_connections_)
{
server = cfg.getString(config_name + ".host");
@ -127,6 +126,38 @@ Pool::Pool(const Poco::Util::AbstractConfiguration & cfg, const std::string & co
}
Pool::Pool(
const std::string & db_,
const std::string & server_,
const std::string & user_,
const std::string & password_,
unsigned port_,
const std::string & socket_,
unsigned connect_timeout_,
unsigned rw_timeout_,
unsigned default_connections_,
unsigned max_connections_,
unsigned enable_local_infile_,
bool opt_reconnect_)
: default_connections(default_connections_)
, max_connections(max_connections_)
, db(db_)
, server(server_)
, user(user_)
, password(password_)
, port(port_)
, socket(socket_)
, connect_timeout(connect_timeout_)
, rw_timeout(rw_timeout_)
, enable_local_infile(enable_local_infile_)
, opt_reconnect(opt_reconnect_)
{
LOG_DEBUG(log,
"Created MySQL Pool with settings: connect_timeout={}, read_write_timeout={}, default_connections_number={}, max_connections_number={}",
connect_timeout, rw_timeout, default_connections, max_connections);
}
Pool::~Pool()
{
std::lock_guard lock(mutex);
@ -148,29 +179,29 @@ Pool::Entry Pool::get(uint64_t wait_timeout)
initialize();
for (;;)
{
logger.trace("(%s): Iterating through existing MySQL connections", getDescription());
LOG_TRACE(log, "{}: Iterating through existing MySQL connections", getDescription());
for (auto & connection : connections)
{
if (connection->ref_count == 0)
{
logger.test("Found free connection in pool, returning it to the caller");
LOG_TEST(log, "Found free connection in pool, returning it to the caller");
return Entry(connection, this);
}
}
logger.trace("(%s): Trying to allocate a new connection.", getDescription());
LOG_TRACE(log, "{}: Trying to allocate a new connection.", getDescription());
if (connections.size() < static_cast<size_t>(max_connections))
{
Connection * conn = allocConnection();
if (conn)
return Entry(conn, this);
logger.trace("(%s): Unable to create a new connection: Allocation failed.", getDescription());
LOG_TRACE(log, "{}: Unable to create a new connection: Allocation failed.", getDescription());
}
else
{
logger.trace("(%s): Unable to create a new connection: Max number of connections has been reached.", getDescription());
LOG_TRACE(log, "{}: Unable to create a new connection: Max number of connections has been reached.", getDescription());
}
if (!wait_timeout)
@ -180,7 +211,7 @@ Pool::Entry Pool::get(uint64_t wait_timeout)
throw Poco::Exception("mysqlxx::Pool is full (connection_wait_timeout is exceeded)");
lock.unlock();
logger.trace("(%s): Sleeping for %d seconds.", getDescription(), MYSQLXX_POOL_SLEEP_ON_CONNECT_FAIL);
LOG_TRACE(log, "{}: Sleeping for {} seconds.", getDescription(), MYSQLXX_POOL_SLEEP_ON_CONNECT_FAIL);
sleepForSeconds(MYSQLXX_POOL_SLEEP_ON_CONNECT_FAIL);
lock.lock();
}
@ -206,7 +237,7 @@ Pool::Entry Pool::tryGet()
return res;
}
logger.debug("(%s): Idle connection to MySQL server cannot be recovered, dropping it.", getDescription());
LOG_DEBUG(log, "{}: Idle connection to MySQL server cannot be recovered, dropping it.", getDescription());
/// This one is disconnected, cannot be reestablished and so needs to be disposed of.
connection_it = connections.erase(connection_it);
@ -229,7 +260,7 @@ Pool::Entry Pool::tryGet()
void Pool::removeConnection(Connection* connection)
{
logger.trace("(%s): Removing connection.", getDescription());
LOG_TRACE(log, "{}: Removing connection.", getDescription());
std::lock_guard lock(mutex);
if (connection)
@ -260,8 +291,8 @@ void Pool::Entry::forceConnected() const
else
sleepForSeconds(MYSQLXX_POOL_SLEEP_ON_CONNECT_FAIL);
pool->logger.debug(
"Creating a new MySQL connection to %s with settings: connect_timeout=%u, read_write_timeout=%u",
LOG_DEBUG(pool->log,
"Creating a new MySQL connection to {} with settings: connect_timeout={}, read_write_timeout={}",
pool->description, pool->connect_timeout, pool->rw_timeout);
data->conn.connect(
@ -287,21 +318,21 @@ bool Pool::Entry::tryForceConnected() const
auto * const mysql_driver = data->conn.getDriver();
const auto prev_connection_id = mysql_thread_id(mysql_driver);
pool->logger.trace("Entry(connection %lu): sending PING to check if it is alive.", prev_connection_id);
LOG_TRACE(pool->log, "Entry(connection {}): sending PING to check if it is alive.", prev_connection_id);
if (data->conn.ping()) /// Attempts to reestablish lost connection
{
const auto current_connection_id = mysql_thread_id(mysql_driver);
if (prev_connection_id != current_connection_id)
{
pool->logger.debug("Entry(connection %lu): Reconnected to MySQL server. Connection id changed: %lu -> %lu",
current_connection_id, prev_connection_id, current_connection_id);
LOG_DEBUG(pool->log, "Entry(connection {}): Reconnected to MySQL server. Connection id changed: {} -> {}",
current_connection_id, prev_connection_id, current_connection_id);
}
pool->logger.trace("Entry(connection %lu): PING ok.", current_connection_id);
LOG_TRACE(pool->log, "Entry(connection {}): PING ok.", current_connection_id);
return true;
}
pool->logger.trace("Entry(connection %lu): PING failed.", prev_connection_id);
LOG_TRACE(pool->log, "Entry(connection {}): PING failed.", prev_connection_id);
return false;
}
@ -326,10 +357,10 @@ Pool::Connection * Pool::allocConnection(bool dont_throw_if_failed_first_time)
try
{
logger.debug("Connecting to %s", description);
LOG_DEBUG(log, "Connecting to {}", description);
logger.debug(
"Creating a new MySQL connection to %s with settings: connect_timeout=%u, read_write_timeout=%u",
LOG_DEBUG(log,
"Creating a new MySQL connection to {} with settings: connect_timeout={}, read_write_timeout={}",
description, connect_timeout, rw_timeout);
conn_ptr->conn.connect(
@ -349,7 +380,7 @@ Pool::Connection * Pool::allocConnection(bool dont_throw_if_failed_first_time)
}
catch (mysqlxx::ConnectionFailed & e)
{
logger.error(e.what());
LOG_ERROR(log, "Failed to connect to MySQL ({}): {}", description, e.what());
if ((!was_successful && !dont_throw_if_failed_first_time)
|| e.errnum() == ER_ACCESS_DENIED_ERROR

View File

@ -169,28 +169,10 @@ public:
unsigned default_connections_ = MYSQLXX_POOL_DEFAULT_START_CONNECTIONS,
unsigned max_connections_ = MYSQLXX_POOL_DEFAULT_MAX_CONNECTIONS,
unsigned enable_local_infile_ = MYSQLXX_DEFAULT_ENABLE_LOCAL_INFILE,
bool opt_reconnect_ = MYSQLXX_DEFAULT_MYSQL_OPT_RECONNECT)
: logger(Poco::Logger::get("mysqlxx::Pool"))
, default_connections(default_connections_)
, max_connections(max_connections_)
, db(db_)
, server(server_)
, user(user_)
, password(password_)
, port(port_)
, socket(socket_)
, connect_timeout(connect_timeout_)
, rw_timeout(rw_timeout_)
, enable_local_infile(enable_local_infile_)
, opt_reconnect(opt_reconnect_)
{
logger.debug(
"Created MySQL Pool with settings: connect_timeout=%u, read_write_timeout=%u, default_connections_number=%u, max_connections_number=%u",
connect_timeout, rw_timeout, default_connections, max_connections);
}
bool opt_reconnect_ = MYSQLXX_DEFAULT_MYSQL_OPT_RECONNECT);
Pool(const Pool & other)
: logger(other.logger), default_connections{other.default_connections},
: default_connections{other.default_connections},
max_connections{other.max_connections},
db{other.db}, server{other.server},
user{other.user}, password{other.password},
@ -220,7 +202,7 @@ public:
void removeConnection(Connection * connection);
protected:
Poco::Logger & logger;
Poco::Logger * log = &Poco::Logger::get("mysqlxx::Pool");
/// Number of MySQL connections which are created at launch.
unsigned default_connections;

View File

@ -24,6 +24,41 @@ namespace DB
}
namespace
{
class Events
{
public:
Events() : start_time(std::chrono::steady_clock::now()) {}
void add(String && event, std::chrono::milliseconds correction = std::chrono::milliseconds::zero())
{
String timepoint = std::to_string(std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start_time).count());
if (timepoint.length() < 5)
timepoint.insert(0, 5 - timepoint.length(), ' ');
if (correction.count())
std::this_thread::sleep_for(correction);
std::lock_guard lock{mutex};
//std::cout << timepoint << " : " << event << std::endl;
events.emplace_back(std::move(event));
}
void check(const Strings & expected_events)
{
std::lock_guard lock{mutex};
EXPECT_EQ(events.size(), expected_events.size());
for (size_t i = 0; i != events.size(); ++i)
EXPECT_EQ(events[i], (i < expected_events.size() ? expected_events[i] : ""));
}
private:
const std::chrono::time_point<std::chrono::steady_clock> start_time;
Strings events TSA_GUARDED_BY(mutex);
mutable std::mutex mutex;
};
}
TEST(Common, RWLock1)
{
/// Tests with threads require this, because otherwise
@ -287,3 +322,260 @@ TEST(Common, RWLockNotUpgradeableWithNoQuery)
read_thread.join();
}
TEST(Common, RWLockWriteLockTimeoutDuringRead)
{
/// 0 100 200 300 400
/// <---------------------------------------- ra ---------------------------------------------->
/// <----- wc (acquiring lock, failed by timeout) ----->
/// <wd>
///
/// 0 : Locking ra
/// 0 : Locked ra
/// 100 : Locking wc
/// 300 : Failed to lock wc
/// 400 : Unlocking ra
/// 400 : Unlocked ra
/// 400 : Locking wd
/// 400 : Locked wd
/// 400 : Unlocking wd
/// 400 : Unlocked wd
static auto rw_lock = RWLockImpl::create();
Events events;
std::thread ra_thread([&] ()
{
events.add("Locking ra");
auto ra = rw_lock->getLock(RWLockImpl::Read, "ra");
events.add(ra ? "Locked ra" : "Failed to lock ra");
EXPECT_NE(ra, nullptr);
std::this_thread::sleep_for(std::chrono::duration<int, std::milli>(400));
events.add("Unlocking ra");
ra.reset();
events.add("Unlocked ra");
});
std::thread wc_thread([&] ()
{
std::this_thread::sleep_for(std::chrono::duration<int, std::milli>(100));
events.add("Locking wc");
auto wc = rw_lock->getLock(RWLockImpl::Write, "wc", std::chrono::milliseconds(200));
events.add(wc ? "Locked wc" : "Failed to lock wc");
EXPECT_EQ(wc, nullptr);
});
ra_thread.join();
wc_thread.join();
{
events.add("Locking wd");
auto wd = rw_lock->getLock(RWLockImpl::Write, "wd", std::chrono::milliseconds(1000));
events.add(wd ? "Locked wd" : "Failed to lock wd");
EXPECT_NE(wd, nullptr);
events.add("Unlocking wd");
wd.reset();
events.add("Unlocked wd");
}
events.check(
{"Locking ra",
"Locked ra",
"Locking wc",
"Failed to lock wc",
"Unlocking ra",
"Unlocked ra",
"Locking wd",
"Locked wd",
"Unlocking wd",
"Unlocked wd"});
}
TEST(Common, RWLockWriteLockTimeoutDuringTwoReads)
{
/// 0 100 200 300 400 500
/// <---------------------------------------- ra ----------------------------------------------->
/// <------ wc (acquiring lock, failed by timeout) ------->
/// <-- rb (acquiring lock) --><---------- rb (locked) ------------>
/// <wd>
///
/// 0 : Locking ra
/// 0 : Locked ra
/// 100 : Locking wc
/// 200 : Locking rb
/// 300 : Failed to lock wc
/// 300 : Locked rb
/// 400 : Unlocking ra
/// 400 : Unlocked ra
/// 500 : Unlocking rb
/// 500 : Unlocked rb
/// 501 : Locking wd
/// 501 : Locked wd
/// 501 : Unlocking wd
/// 501 : Unlocked wd
static auto rw_lock = RWLockImpl::create();
Events events;
std::thread ra_thread([&] ()
{
events.add("Locking ra");
auto ra = rw_lock->getLock(RWLockImpl::Read, "ra");
events.add(ra ? "Locked ra" : "Failed to lock ra");
EXPECT_NE(ra, nullptr);
std::this_thread::sleep_for(std::chrono::duration<int, std::milli>(400));
events.add("Unlocking ra");
ra.reset();
events.add("Unlocked ra");
});
std::thread rb_thread([&] ()
{
std::this_thread::sleep_for(std::chrono::duration<int, std::milli>(200));
events.add("Locking rb");
auto rb = rw_lock->getLock(RWLockImpl::Read, "rb");
/// `correction` is used here to add an event to `events` a little later.
/// (Because the event "Locked rb" happens at nearly the same time as "Failed to lock wc" and we don't want our test to be flaky.)
auto correction = std::chrono::duration<int, std::milli>(50);
events.add(rb ? "Locked rb" : "Failed to lock rb", correction);
EXPECT_NE(rb, nullptr);
std::this_thread::sleep_for(std::chrono::duration<int, std::milli>(200) - correction);
events.add("Unlocking rb");
rb.reset();
events.add("Unlocked rb");
});
std::thread wc_thread([&] ()
{
std::this_thread::sleep_for(std::chrono::duration<int, std::milli>(100));
events.add("Locking wc");
auto wc = rw_lock->getLock(RWLockImpl::Write, "wc", std::chrono::milliseconds(200));
events.add(wc ? "Locked wc" : "Failed to lock wc");
EXPECT_EQ(wc, nullptr);
});
ra_thread.join();
rb_thread.join();
wc_thread.join();
{
events.add("Locking wd");
auto wd = rw_lock->getLock(RWLockImpl::Write, "wd", std::chrono::milliseconds(1000));
events.add(wd ? "Locked wd" : "Failed to lock wd");
EXPECT_NE(wd, nullptr);
events.add("Unlocking wd");
wd.reset();
events.add("Unlocked wd");
}
events.check(
{"Locking ra",
"Locked ra",
"Locking wc",
"Locking rb",
"Failed to lock wc",
"Locked rb",
"Unlocking ra",
"Unlocked ra",
"Unlocking rb",
"Unlocked rb",
"Locking wd",
"Locked wd",
"Unlocking wd",
"Unlocked wd"});
}
TEST(Common, RWLockWriteLockTimeoutDuringWriteWithWaitingRead)
{
/// 0 100 200 300 400 500
/// <--------------------------------------------------- wa -------------------------------------------------------->
/// <------ wb (acquiring lock, failed by timeout) ------>
/// <-- rc (acquiring lock, failed by timeout) -->
/// <wd>
///
/// 0 : Locking wa
/// 0 : Locked wa
/// 100 : Locking wb
/// 200 : Locking rc
/// 300 : Failed to lock wb
/// 400 : Failed to lock rc
/// 500 : Unlocking wa
/// 500 : Unlocked wa
/// 501 : Locking wd
/// 501 : Locked wd
/// 501 : Unlocking wd
/// 501 : Unlocked wd
static auto rw_lock = RWLockImpl::create();
Events events;
std::thread wa_thread([&] ()
{
events.add("Locking wa");
auto wa = rw_lock->getLock(RWLockImpl::Write, "wa");
events.add(wa ? "Locked wa" : "Failed to lock wa");
EXPECT_NE(wa, nullptr);
std::this_thread::sleep_for(std::chrono::duration<int, std::milli>(500));
events.add("Unlocking wa");
wa.reset();
events.add("Unlocked wa");
});
std::thread wb_thread([&] ()
{
std::this_thread::sleep_for(std::chrono::duration<int, std::milli>(100));
events.add("Locking wb");
auto wc = rw_lock->getLock(RWLockImpl::Write, "wc", std::chrono::milliseconds(200));
events.add(wc ? "Locked wb" : "Failed to lock wb");
EXPECT_EQ(wc, nullptr);
});
std::thread rc_thread([&] ()
{
std::this_thread::sleep_for(std::chrono::duration<int, std::milli>(200));
events.add("Locking rc");
auto rc = rw_lock->getLock(RWLockImpl::Read, "rc", std::chrono::milliseconds(200));
events.add(rc ? "Locked rc" : "Failed to lock rc");
EXPECT_EQ(rc, nullptr);
});
wa_thread.join();
wb_thread.join();
rc_thread.join();
{
events.add("Locking wd");
auto wd = rw_lock->getLock(RWLockImpl::Write, "wd", std::chrono::milliseconds(1000));
events.add(wd ? "Locked wd" : "Failed to lock wd");
EXPECT_NE(wd, nullptr);
events.add("Unlocking wd");
wd.reset();
events.add("Unlocked wd");
}
events.check(
{"Locking wa",
"Locked wa",
"Locking wb",
"Locking rc",
"Failed to lock wb",
"Failed to lock rc",
"Unlocking wa",
"Unlocked wa",
"Locking wd",
"Locked wd",
"Unlocking wd",
"Unlocked wd"});
}

View File

@ -43,6 +43,7 @@ struct Settings;
M(UInt64, max_requests_batch_bytes_size, 100*1024, "Max size in bytes of batch of requests that can be sent to RAFT", 0) \
M(UInt64, max_flush_batch_size, 1000, "Max size of batch of requests that can be flushed together", 0) \
M(UInt64, max_requests_quick_batch_size, 100, "Max size of batch of requests to try to get before proceeding with RAFT. Keeper will not wait for requests but take only requests that are already in queue" , 0) \
M(UInt64, max_memory_usage_soft_limit, 0, "Soft limit in bytes of keeper memory usage", 0) \
M(Bool, quorum_reads, false, "Execute read requests as writes through whole RAFT consesus with similar speed", 0) \
M(Bool, force_sync, true, "Call fsync on each change in RAFT changelog", 0) \
M(Bool, compress_logs, false, "Write compressed coordination logs in ZSTD format", 0) \

View File

@ -113,6 +113,12 @@ KeeperAsynchronousMetrics::KeeperAsynchronousMetrics(
{
}
KeeperAsynchronousMetrics::~KeeperAsynchronousMetrics()
{
/// NOTE: stop() from base class is not enough, since this leads to leak on vptr
stop();
}
void KeeperAsynchronousMetrics::updateImpl(AsynchronousMetricValues & new_values, TimePoint /*update_time*/, TimePoint /*current_time*/)
{
#if USE_NURAFT

View File

@ -14,6 +14,7 @@ class KeeperAsynchronousMetrics : public AsynchronousMetrics
public:
KeeperAsynchronousMetrics(
ContextPtr context_, int update_period_seconds, const ProtocolServerMetricsFunc & protocol_server_metrics_func_);
~KeeperAsynchronousMetrics() override;
private:
ContextPtr context;

View File

@ -51,6 +51,56 @@ namespace ErrorCodes
extern const int SYSTEM_ERROR;
}
namespace
{
bool checkIfRequestIncreaseMem(const Coordination::ZooKeeperRequestPtr & request)
{
if (request->getOpNum() == Coordination::OpNum::Create
|| request->getOpNum() == Coordination::OpNum::CreateIfNotExists
|| request->getOpNum() == Coordination::OpNum::Set)
{
return true;
}
else if (request->getOpNum() == Coordination::OpNum::Multi)
{
Coordination::ZooKeeperMultiRequest & multi_req = dynamic_cast<Coordination::ZooKeeperMultiRequest &>(*request);
Int64 memory_delta = 0;
for (const auto & sub_req : multi_req.requests)
{
auto sub_zk_request = std::dynamic_pointer_cast<Coordination::ZooKeeperRequest>(sub_req);
switch (sub_zk_request->getOpNum())
{
case Coordination::OpNum::Create:
case Coordination::OpNum::CreateIfNotExists:
{
Coordination::ZooKeeperCreateRequest & create_req = dynamic_cast<Coordination::ZooKeeperCreateRequest &>(*sub_zk_request);
memory_delta += create_req.bytesSize();
break;
}
case Coordination::OpNum::Set:
{
Coordination::ZooKeeperSetRequest & set_req = dynamic_cast<Coordination::ZooKeeperSetRequest &>(*sub_zk_request);
memory_delta += set_req.bytesSize();
break;
}
case Coordination::OpNum::Remove:
{
Coordination::ZooKeeperRemoveRequest & remove_req = dynamic_cast<Coordination::ZooKeeperRemoveRequest &>(*sub_zk_request);
memory_delta -= remove_req.bytesSize();
break;
}
default:
break;
}
}
return memory_delta > 0;
}
return false;
}
}
KeeperDispatcher::KeeperDispatcher()
: responses_queue(std::numeric_limits<size_t>::max())
@ -93,6 +143,14 @@ void KeeperDispatcher::requestThread()
if (shutdown_called)
break;
Int64 mem_soft_limit = configuration_and_settings->coordination_settings->max_memory_usage_soft_limit;
if (configuration_and_settings->standalone_keeper && mem_soft_limit > 0 && total_memory_tracker.get() >= mem_soft_limit && checkIfRequestIncreaseMem(request.request))
{
LOG_TRACE(log, "Processing requests refused because of max_memory_usage_soft_limit {}, the total used memory is {}, request type is {}", mem_soft_limit, total_memory_tracker.get(), request.request->getOpNum());
addErrorResponses({request}, Coordination::Error::ZCONNECTIONLOSS);
continue;
}
KeeperStorage::RequestsForSessions current_batch;
size_t current_batch_bytes_size = 0;

View File

@ -4,6 +4,7 @@
#include <Common/Macros.h>
#include <Common/ThreadPool.h>
#include <Common/callOnce.h>
#include <Disks/IO/IOUringReader.h>
#include <Core/ServerSettings.h>
@ -62,6 +63,11 @@ struct ContextSharedPart : boost::noncopyable
mutable std::unique_ptr<IAsynchronousReader> asynchronous_local_fs_reader;
mutable std::unique_ptr<IAsynchronousReader> synchronous_local_fs_reader;
#if USE_LIBURING
mutable OnceFlag io_uring_reader_initialized;
mutable std::unique_ptr<IOUringReader> io_uring_reader;
#endif
mutable OnceFlag threadpool_writer_initialized;
mutable std::unique_ptr<ThreadPool> threadpool_writer;
@ -225,6 +231,17 @@ IAsynchronousReader & Context::getThreadPoolReader(FilesystemReaderType type) co
}
}
#if USE_LIBURING
IOUringReader & Context::getIOURingReader() const
{
callOnce(shared->io_uring_reader_initialized, [&] {
shared->io_uring_reader = std::make_unique<IOUringReader>(512);
});
return *shared->io_uring_reader;
}
#endif
std::shared_ptr<FilesystemCacheLog> Context::getFilesystemCacheLog() const
{
return nullptr;

View File

@ -20,6 +20,8 @@
#include <memory>
#include "config.h"
namespace DB
{
@ -28,6 +30,7 @@ class Macros;
class FilesystemCacheLog;
class FilesystemReadPrefetchesLog;
class BlobStorageLog;
class IOUringReader;
/// A small class which owns ContextShared.
/// We don't use something like unique_ptr directly to allow ContextShared type to be incomplete.
@ -127,6 +130,9 @@ public:
ApplicationType getApplicationType() const { return ApplicationType::KEEPER; }
IAsynchronousReader & getThreadPoolReader(FilesystemReaderType type) const;
#if USE_LIBURING
IOUringReader & getIOURingReader() const;
#endif
std::shared_ptr<AsyncReadCounters> getAsyncReadCounters() const;
ThreadPool & getThreadPoolWriter() const;

View File

@ -156,6 +156,7 @@ class IColumn;
M(Bool, allow_suspicious_low_cardinality_types, false, "In CREATE TABLE statement allows specifying LowCardinality modifier for types of small fixed size (8 or less). Enabling this may increase merge times and memory consumption.", 0) \
M(Bool, allow_suspicious_fixed_string_types, false, "In CREATE TABLE statement allows creating columns of type FixedString(n) with n > 256. FixedString with length >= 256 is suspicious and most likely indicates misusage", 0) \
M(Bool, allow_suspicious_indices, false, "Reject primary/secondary indexes and sorting keys with identical expressions", 0) \
M(Bool, allow_suspicious_ttl_expressions, false, "Reject TTL expressions that don't depend on any of table's columns. It indicates a user error most of the time.", 0) \
M(Bool, compile_expressions, false, "Compile some scalar functions and operators to native code.", 0) \
M(UInt64, min_count_to_compile_expression, 3, "The number of identical expressions before they are JIT-compiled", 0) \
M(Bool, compile_aggregate_expressions, true, "Compile aggregate functions to native code.", 0) \

View File

@ -7,6 +7,7 @@
#include <boost/algorithm/string.hpp>
#include <map>
namespace DB
{
@ -80,6 +81,7 @@ namespace SettingsChangesHistory
/// It's used to implement `compatibility` setting (see https://github.com/ClickHouse/ClickHouse/issues/35972)
static std::map<ClickHouseVersion, SettingsChangesHistory::SettingsChanges> settings_changes_history =
{
{"23.12", {{"allow_suspicious_ttl_expressions", true, false, "It is a new setting, and in previous versions the behavior was equivalent to allowing."}}},
{"23.9", {{"optimize_group_by_constant_keys", false, true, "Optimize group by constant keys by default"},
{"input_format_json_try_infer_named_tuples_from_objects", false, true, "Try to infer named Tuples from JSON objects by default"},
{"input_format_json_read_numbers_as_strings", false, true, "Allow to read numbers as strings in JSON formats by default"},

View File

@ -89,15 +89,14 @@ void DatabaseAtomic::drop(ContextPtr)
fs::remove_all(getMetadataPath());
}
void DatabaseAtomic::attachTable(ContextPtr /* context_ */, const String & name, const StoragePtr & table, const String & relative_table_path)
void DatabaseAtomic::attachTableUnlocked(ContextPtr local_context, const String & name, const StoragePtr & table, const String & relative_table_path)
{
assert(relative_table_path != data_path && !relative_table_path.empty());
DetachedTables not_in_use;
std::lock_guard lock(mutex);
not_in_use = cleanupDetachedTables();
auto table_id = table->getStorageID();
assertDetachedTableNotInUse(table_id.uuid);
DatabaseOrdinary::attachTableUnlocked(name, table);
DatabaseOrdinary::attachTableUnlocked(local_context, name, table, relative_table_path);
table_name_to_path.emplace(std::make_pair(name, relative_table_path));
}
@ -325,7 +324,7 @@ void DatabaseAtomic::commitCreateTable(const ASTCreateQuery & query, const Stora
/// It throws if `table_metadata_path` already exists (it's possible if table was detached)
renameNoReplace(table_metadata_tmp_path, table_metadata_path); /// Commit point (a sort of)
attachTableUnlocked(query.getTable(), table); /// Should never throw
DatabaseWithOwnTablesBase::attachTableUnlocked(query_context, query.getTable(), table, /*relative_table_path=*/ {}); /// Should never throw
table_name_to_path.emplace(query.getTable(), table_data_path);
}
catch (...)

View File

@ -38,7 +38,6 @@ public:
void dropTable(ContextPtr context, const String & table_name, bool sync) override;
void dropTableImpl(ContextPtr context, const String & table_name, bool sync);
void attachTable(ContextPtr context, const String & name, const StoragePtr & table, const String & relative_table_path) override;
StoragePtr detachTable(ContextPtr context, const String & name) override;
String getTableDataPath(const String & table_name) const override;
@ -66,6 +65,8 @@ public:
void setDetachedTableNotInUseForce(const UUID & uuid) override;
protected:
void attachTableUnlocked(ContextPtr local_context, const String & name, const StoragePtr & table, const String & relative_table_path) TSA_REQUIRES(mutex) override;
void commitAlterTable(const StorageID & table_id, const String & table_metadata_tmp_path, const String & table_metadata_path, const String & statement, ContextPtr query_context) override;
void commitCreateTable(const ASTCreateQuery & query, const StoragePtr & table,
const String & table_metadata_tmp_path, const String & table_metadata_path, ContextPtr query_context) override;

View File

@ -161,10 +161,9 @@ bool DatabaseLazy::empty() const
return tables_cache.empty();
}
void DatabaseLazy::attachTable(ContextPtr /* context_ */, const String & table_name, const StoragePtr & table, const String &)
void DatabaseLazy::attachTableUnlocked(ContextPtr /* context_ */, const String & table_name, const StoragePtr & table, const String &)
{
LOG_DEBUG(log, "Attach table {}.", backQuote(table_name));
std::lock_guard lock(mutex);
time_t current_time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
auto [it, inserted] = tables_cache.emplace(std::piecewise_construct,

View File

@ -64,14 +64,15 @@ public:
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr context, const FilterByNameFunction & filter_by_table_name) const override;
void attachTable(ContextPtr context, const String & table_name, const StoragePtr & table, const String & relative_table_path) override;
StoragePtr detachTable(ContextPtr context, const String & table_name) override;
void shutdown() override;
~DatabaseLazy() override;
protected:
void attachTableUnlocked(ContextPtr context, const String & table_name, const StoragePtr & table, const String & relative_table_path) TSA_REQUIRES(mutex) override;
private:
struct CacheExpirationQueueElement
{

View File

@ -33,13 +33,13 @@ DatabaseMemory::DatabaseMemory(const String & name_, ContextPtr context_)
}
void DatabaseMemory::createTable(
ContextPtr /*context*/,
ContextPtr local_context,
const String & table_name,
const StoragePtr & table,
const ASTPtr & query)
{
std::lock_guard lock{mutex};
attachTableUnlocked(table_name, table);
attachTableUnlocked(local_context, table_name, table, /*relative_table_path=*/ {});
/// Clean the query from temporary flags.
ASTPtr query_to_store = query;
@ -56,7 +56,7 @@ void DatabaseMemory::createTable(
}
void DatabaseMemory::dropTable(
ContextPtr /*context*/,
ContextPtr local_context,
const String & table_name,
bool /*sync*/)
{
@ -83,7 +83,7 @@ void DatabaseMemory::dropTable(
catch (...)
{
std::lock_guard lock{mutex};
attachTableUnlocked(table_name, table);
attachTableUnlocked(local_context, table_name, table, /*relative_table_path=*/ {});
throw;
}

View File

@ -7,6 +7,7 @@
#include <Parsers/formatAST.h>
#include <Storages/StorageDictionary.h>
#include <Storages/StorageFactory.h>
#include <Common/quoteString.h>
#include <Common/typeid_cast.h>
#include <Common/escapeForFileName.h>
#include <TableFunctions/TableFunctionFactory.h>
@ -193,7 +194,7 @@ DatabaseWithOwnTablesBase::DatabaseWithOwnTablesBase(const String & name_, const
bool DatabaseWithOwnTablesBase::isTableExist(const String & table_name, ContextPtr) const
{
std::lock_guard lock(mutex);
return tables.find(table_name) != tables.end();
return tables.find(table_name) != tables.end() || lazy_tables.find(table_name) != lazy_tables.end();
}
StoragePtr DatabaseWithOwnTablesBase::tryGetTable(const String & table_name, ContextPtr) const
@ -205,6 +206,9 @@ StoragePtr DatabaseWithOwnTablesBase::tryGetTable(const String & table_name, Con
DatabaseTablesIteratorPtr DatabaseWithOwnTablesBase::getTablesIterator(ContextPtr, const FilterByNameFunction & filter_by_table_name) const
{
std::lock_guard lock(mutex);
loadLazyTables();
if (!filter_by_table_name)
return std::make_unique<DatabaseTablesSnapshotIterator>(tables, database_name);
@ -250,13 +254,7 @@ StoragePtr DatabaseWithOwnTablesBase::detachTableUnlocked(const String & table_n
return res;
}
void DatabaseWithOwnTablesBase::attachTable(ContextPtr /* context_ */, const String & table_name, const StoragePtr & table, const String &)
{
std::lock_guard lock(mutex);
attachTableUnlocked(table_name, table);
}
void DatabaseWithOwnTablesBase::attachTableUnlocked(const String & table_name, const StoragePtr & table)
void DatabaseWithOwnTablesBase::attachTableUnlocked(ContextPtr, const String & name, const StoragePtr & table, const String &)
{
auto table_id = table->getStorageID();
if (table_id.database_name != database_name)
@ -269,7 +267,7 @@ void DatabaseWithOwnTablesBase::attachTableUnlocked(const String & table_name, c
DatabaseCatalog::instance().addUUIDMapping(table_id.uuid, shared_from_this(), table);
}
if (!tables.emplace(table_name, table).second)
if (!tables.emplace(name, table).second)
{
if (table_id.hasUUID())
DatabaseCatalog::instance().removeUUIDMapping(table_id.uuid);
@ -281,6 +279,12 @@ void DatabaseWithOwnTablesBase::attachTableUnlocked(const String & table_name, c
table->is_detached = false;
}
void DatabaseWithOwnTablesBase::registerLazyTableUnlocked(const String & table_name, LazyTableCreator table_creator, const String & relative_table_path)
{
if (!lazy_tables.emplace(table_name, std::make_pair(relative_table_path, std::move(table_creator))).second)
throw Exception(ErrorCodes::TABLE_ALREADY_EXISTS, "Table {} already registered.", table_name);
}
void DatabaseWithOwnTablesBase::shutdown()
{
/// You can not hold a lock during shutdown.
@ -381,10 +385,45 @@ void DatabaseWithOwnTablesBase::createTableRestoredFromBackup(const ASTPtr & cre
StoragePtr DatabaseWithOwnTablesBase::tryGetTableNoWait(const String & table_name) const
{
std::lock_guard lock(mutex);
auto it = tables.find(table_name);
if (it != tables.end())
return it->second;
const auto lazy_it = lazy_tables.find(table_name);
if (lazy_it != lazy_tables.end())
{
LOG_DEBUG(log, "Attaching lazy table {}", backQuoteIfNeed(table_name));
auto relative_table_path = lazy_it->second.first;
auto storage = lazy_it->second.second();
lazy_tables.erase(lazy_it);
(const_cast<DatabaseWithOwnTablesBase *>(this))->attachTableUnlocked(Context::getGlobalContextInstance(), table_name, storage, relative_table_path);
it = tables.find(table_name);
if (it != tables.end())
return it->second;
}
return {};
}
void DatabaseWithOwnTablesBase::loadLazyTables() const
{
if (lazy_tables.empty())
return;
ContextPtr global_context = Context::getGlobalContextInstance();
while (!lazy_tables.empty())
{
auto lazy_it = lazy_tables.begin();
const auto table_name = lazy_it->first;
LOG_DEBUG(log, "Attaching lazy table {}", backQuoteIfNeed(table_name));
auto relative_table_path = lazy_it->second.first;
auto storage = lazy_it->second.second();
lazy_tables.erase(lazy_it);
(const_cast<DatabaseWithOwnTablesBase *>(this))->attachTableUnlocked(global_context, table_name, storage, relative_table_path);
}
}
}

View File

@ -30,8 +30,6 @@ public:
bool empty() const override;
void attachTable(ContextPtr context, const String & table_name, const StoragePtr & table, const String & relative_table_path) override;
StoragePtr detachTable(ContextPtr context, const String & table_name) override;
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr context, const FilterByNameFunction & filter_by_table_name) const override;
@ -45,14 +43,19 @@ public:
protected:
Tables tables TSA_GUARDED_BY(mutex);
/// Tables that are attached lazily
mutable LazyTables lazy_tables TSA_GUARDED_BY(mutex);
Poco::Logger * log;
DatabaseWithOwnTablesBase(const String & name_, const String & logger, ContextPtr context);
void attachTableUnlocked(const String & table_name, const StoragePtr & table) TSA_REQUIRES(mutex);
void attachTableUnlocked(ContextPtr context, const String & name, const StoragePtr & table, const String & relative_table_path) TSA_REQUIRES(mutex) override;
void registerLazyTableUnlocked(const String & table_name, LazyTableCreator table_creator, const String & relative_table_path) TSA_REQUIRES(mutex) override;
StoragePtr detachTableUnlocked(const String & table_name) TSA_REQUIRES(mutex);
StoragePtr getTableUnlocked(const String & table_name) const TSA_REQUIRES(mutex);
StoragePtr tryGetTableNoWait(const String & table_name) const;
void loadLazyTables() const TSA_REQUIRES(mutex);
};
}

View File

@ -46,4 +46,20 @@ void IDatabase::createTableRestoredFromBackup(const ASTPtr & create_table_query,
backQuoteIfNeed(create_table_query->as<const ASTCreateQuery &>().getTable()));
}
/// Add a table to the database, but do not add it to the metadata. The database may not support this method.
///
/// Note: ATTACH TABLE statement actually uses createTable method.
void IDatabase::attachTable(ContextPtr context, const String & name, const StoragePtr & table, const String & relative_table_path) /// NOLINT
{
std::lock_guard lock(mutex);
attachTableUnlocked(context, name, table, relative_table_path);
}
void IDatabase::registerLazyTable(ContextPtr, const String & table_name, LazyTableCreator table_creator, const String & relative_table_path) /// NOLINT
{
std::lock_guard lock(mutex);
registerLazyTableUnlocked(table_name, std::move(table_creator), relative_table_path);
}
}

View File

@ -125,7 +125,6 @@ public:
using DatabaseTablesIteratorPtr = std::unique_ptr<IDatabaseTablesIterator>;
/** Database engine.
* It is responsible for:
* - initialization of set of known tables and dictionaries;
@ -138,6 +137,10 @@ using DatabaseTablesIteratorPtr = std::unique_ptr<IDatabaseTablesIterator>;
class IDatabase : public std::enable_shared_from_this<IDatabase>
{
public:
using LazyTableCreator = std::function<StoragePtr()>;
/// Map{table_name, Pair{relative_table_path, LazyTableCreator}}
using LazyTables = std::map<String, std::pair<String, LazyTableCreator>>;
IDatabase() = delete;
explicit IDatabase(String database_name_) : database_name(std::move(database_name_)) {}
@ -269,11 +272,17 @@ public:
/// Add a table to the database, but do not add it to the metadata. The database may not support this method.
///
/// Note: ATTACH TABLE statement actually uses createTable method.
virtual void attachTable(ContextPtr /* context */, const String & /*name*/, const StoragePtr & /*table*/, [[maybe_unused]] const String & relative_table_path = {}) /// NOLINT
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "There is no ATTACH TABLE query for Database{}", getEngineName());
}
/// @param relative_table_path - only for Atomic engine
///
/// Note:
/// - ATTACH TABLE statement actually uses createTable method.
/// - Instead of overriding this method you should override attachTableUnlocked()
/// (This method is only for DatabasesOverlay to override)
virtual void attachTable(ContextPtr context, const String & name, const StoragePtr & table, const String & relative_table_path = {}); /// NOLINT
/// Register tables lazily (attach will be done only when the table will be used) instead of attaching it.
/// This is needed to improve startup time of clickhouse-local.
virtual void registerLazyTable(ContextPtr context, const String & table_name, LazyTableCreator table_creator, const String & relative_table_path = {});
/// Forget about the table without deleting it, and return it. The database may not support this method.
virtual StoragePtr detachTable(ContextPtr /* context */, const String & /*name*/)
@ -430,6 +439,16 @@ protected:
return nullptr;
}
virtual void attachTableUnlocked(ContextPtr /*context*/, const String & /*name*/, const StoragePtr & /*table*/, const String & /*relative_table_path*/ = {}) TSA_REQUIRES(mutex) /// NOLINT
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "There is no ATTACH TABLE query for Database{}", getEngineName());
}
virtual void registerLazyTableUnlocked(const String & /* table_name */, LazyTableCreator /* table_creator */, const String & /* relative_table_path */) TSA_REQUIRES(mutex) /// NOLINT
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "There lazy table initialization support for Database{}", getEngineName());
}
mutable std::mutex mutex;
String database_name TSA_GUARDED_BY(mutex);
String comment TSA_GUARDED_BY(mutex);

View File

@ -101,10 +101,10 @@ void DatabaseMaterializedMySQL::dropTable(ContextPtr context_, const String & na
DatabaseAtomic::dropTable(context_, name, sync);
}
void DatabaseMaterializedMySQL::attachTable(ContextPtr context_, const String & name, const StoragePtr & table, const String & relative_table_path)
void DatabaseMaterializedMySQL::attachTableUnlocked(ContextPtr context_, const String & name, const StoragePtr & table, const String & relative_table_path)
{
checkIsInternalQuery(context_, "ATTACH TABLE");
DatabaseAtomic::attachTable(context_, name, table, relative_table_path);
DatabaseAtomic::attachTableUnlocked(context_, name, table, relative_table_path);
}
StoragePtr DatabaseMaterializedMySQL::detachTable(ContextPtr context_, const String & name)

View File

@ -48,6 +48,8 @@ protected:
LoadTaskPtr startup_mysql_database_task;
void attachTableUnlocked(ContextPtr context_, const String & name, const StoragePtr & table, const String & relative_table_path) TSA_REQUIRES(mutex) override;
public:
String getEngineName() const override { return "MaterializedMySQL"; }
@ -58,8 +60,6 @@ public:
void dropTable(ContextPtr context_, const String & name, bool sync) override;
void attachTable(ContextPtr context_, const String & name, const StoragePtr & table, const String & relative_table_path) override;
StoragePtr detachTable(ContextPtr context_, const String & name) override;
void renameTable(ContextPtr context_, const String & name, IDatabase & to_database, const String & to_name, bool exchange, bool dictionary) override;

View File

@ -361,10 +361,8 @@ void DatabaseMySQL::cleanOutdatedTables()
}
}
void DatabaseMySQL::attachTable(ContextPtr /* context_ */, const String & table_name, const StoragePtr & storage, const String &)
void DatabaseMySQL::attachTableUnlocked(ContextPtr /* context_ */, const String & table_name, const StoragePtr & storage, const String &)
{
std::lock_guard lock{mutex};
if (!local_tables_cache.contains(table_name))
throw Exception(ErrorCodes::UNKNOWN_TABLE, "Cannot attach table {}.{} because it does not exist.",
backQuoteIfNeed(database_name), backQuoteIfNeed(table_name));

View File

@ -84,9 +84,9 @@ public:
void dropTable(ContextPtr context, const String & table_name, bool sync) override;
void attachTable(ContextPtr context, const String & table_name, const StoragePtr & storage, const String & relative_table_path) override;
protected:
void attachTableUnlocked(ContextPtr context, const String & table_name, const StoragePtr & storage, const String & relative_table_path) TSA_REQUIRES(mutex) override;
ASTPtr getCreateTableQueryImpl(const String & name, ContextPtr context, bool throw_on_error) const override;
private:

View File

@ -216,10 +216,8 @@ StoragePtr DatabasePostgreSQL::fetchTable(const String & table_name, ContextPtr
}
void DatabasePostgreSQL::attachTable(ContextPtr /* context_ */, const String & table_name, const StoragePtr & storage, const String &)
void DatabasePostgreSQL::attachTableUnlocked(ContextPtr /* context_ */, const String & table_name, const StoragePtr & storage, const String &)
{
std::lock_guard lock{mutex};
if (!checkPostgresTable(table_name))
throw Exception(ErrorCodes::UNKNOWN_TABLE,
"Cannot attach PostgreSQL table {} because it does not exist in PostgreSQL (database: {})",

View File

@ -54,13 +54,14 @@ public:
void createTable(ContextPtr, const String & table_name, const StoragePtr & storage, const ASTPtr & create_query) override;
void dropTable(ContextPtr, const String & table_name, bool sync) override;
void attachTable(ContextPtr context, const String & table_name, const StoragePtr & storage, const String & relative_table_path) override;
StoragePtr detachTable(ContextPtr context, const String & table_name) override;
void drop(ContextPtr /*context*/) override;
void shutdown() override;
protected:
void attachTableUnlocked(ContextPtr context, const String & table_name, const StoragePtr & storage, const String & relative_table_path) TSA_REQUIRES(mutex) override;
ASTPtr getCreateTableQueryImpl(const String & table_name, ContextPtr context, bool throw_on_error) const override;
private:

View File

@ -216,7 +216,9 @@ PostgreSQLTableStructure::ColumnsInfoPtr readNamesAndTypesList(
/// All rows must contain the same number of dimensions, so limit 1 is ok. If number of dimensions in all rows is not the same -
/// such arrays are not able to be used as ClickHouse Array at all.
pqxx::result result{tx.exec(fmt::format("SELECT array_ndims({}) FROM {} LIMIT 1", name_and_type.name, postgres_table))};
auto dimensions = result[0][0].as<int>();
// array_ndims() may return null for empty array, but we expect 0:
// https://github.com/postgres/postgres/blob/d16a0c1e2e3874cd5adfa9ee968008b6c4b1ae01/src/backend/utils/adt/arrayfuncs.c#L1658
auto dimensions = result[0][0].as<std::optional<int>>().value_or(0);
/// It is always 1d array if it is in recheck.
DataTypePtr type = assert_cast<const DataTypeArray *>(name_and_type.type.get())->getNestedType();

View File

@ -101,12 +101,16 @@ std::unique_ptr<ReadBufferFromFileBase> createReadBufferFromFileBase(
else if (settings.local_fs_method == LocalFSReadMethod::io_uring)
{
#if USE_LIBURING
static std::shared_ptr<IOUringReader> reader = std::make_shared<IOUringReader>(512);
if (!reader->isSupported())
auto global_context = Context::getGlobalContextInstance();
if (!global_context)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot obtain io_uring reader (global context not initialized)");
auto & reader = global_context->getIOURingReader();
if (!reader.isSupported())
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "io_uring is not supported by this system");
res = std::make_unique<AsynchronousReadBufferFromFileWithDescriptorsCache>(
*reader,
reader,
settings.priority,
filename,
buffer_size,

View File

@ -15,14 +15,13 @@ namespace ErrorCodes
{
extern const int OPENSSL_ERROR;
}
}
namespace OpenSSLDetails
{
void onError(std::string error_message)
{
error_message += ". OpenSSL error code: " + std::to_string(ERR_get_error());
throw DB::Exception::createDeprecated(error_message, DB::ErrorCodes::OPENSSL_ERROR);
throw Exception(ErrorCodes::OPENSSL_ERROR, "{}. OpenSSL error code: {}", error_message, ERR_get_error());
}
StringRef foldEncryptionKeyInMySQLCompatitableMode(size_t cipher_key_size, StringRef key, std::array<char, EVP_MAX_KEY_LENGTH> & folded_key)
@ -48,4 +47,6 @@ const EVP_CIPHER * getCipherByName(StringRef cipher_name)
}
}
#endif

View File

@ -25,13 +25,14 @@
#include <string.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
}
namespace OpenSSLDetails
{
@ -60,7 +61,7 @@ struct KeyHolder
inline StringRef setKey(size_t cipher_key_size, StringRef key) const
{
if (key.size != cipher_key_size)
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Invalid key size: {} expected {}", key.size, cipher_key_size);
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Invalid key size: {} expected {}", key.size, cipher_key_size);
return key;
}
@ -72,7 +73,7 @@ struct KeyHolder<CipherMode::MySQLCompatibility>
inline StringRef setKey(size_t cipher_key_size, StringRef key)
{
if (key.size < cipher_key_size)
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Invalid key size: {} expected {}", key.size, cipher_key_size);
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Invalid key size: {} expected {}", key.size, cipher_key_size);
// MySQL does something fancy with the keys that are too long,
// ruining compatibility with OpenSSL and not improving security.
@ -118,7 +119,7 @@ inline void validateCipherMode(const EVP_CIPHER * evp_cipher)
}
}
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Unsupported cipher mode");
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unsupported cipher mode");
}
template <CipherMode mode>
@ -127,13 +128,11 @@ inline void validateIV(StringRef iv_value, const size_t cipher_iv_size)
// In MySQL mode we don't care if IV is longer than expected, only if shorter.
if ((mode == CipherMode::MySQLCompatibility && iv_value.size != 0 && iv_value.size < cipher_iv_size)
|| (mode == CipherMode::OpenSSLCompatibility && iv_value.size != 0 && iv_value.size != cipher_iv_size))
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Invalid IV size: {} expected {}", iv_value.size, cipher_iv_size);
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Invalid IV size: {} expected {}", iv_value.size, cipher_iv_size);
}
}
namespace DB
{
template <typename Impl>
class FunctionEncrypt : public IFunction
{
@ -313,12 +312,12 @@ private:
// in GCM mode IV can be of arbitrary size (>0), IV is optional for other modes.
if (mode == CipherMode::RFC5116_AEAD_AES_GCM && iv_value.size == 0)
{
throw Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Invalid IV size {} != expected size {}", iv_value.size, iv_size);
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Invalid IV size {} != expected size {}", iv_value.size, iv_size);
}
if (mode != CipherMode::RFC5116_AEAD_AES_GCM && key_value.size != key_size)
{
throw Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Invalid key size {} != expected size {}", key_value.size, key_size);
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Invalid key size {} != expected size {}", key_value.size, key_size);
}
}
@ -608,12 +607,12 @@ private:
// in GCM mode IV can be of arbitrary size (>0), for other modes IV is optional.
if (mode == CipherMode::RFC5116_AEAD_AES_GCM && iv_value.size == 0)
{
throw Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Invalid IV size {} != expected size {}", iv_value.size, iv_size);
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Invalid IV size {} != expected size {}", iv_value.size, iv_size);
}
if (key_value.size != key_size)
{
throw Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Invalid key size {} != expected size {}", key_value.size, key_size);
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Invalid key size {} != expected size {}", key_value.size, key_size);
}
}

View File

@ -28,6 +28,11 @@
# include <openssl/md4.h>
# include <openssl/md5.h>
# include <openssl/sha.h>
#if USE_BORINGSSL
# include <openssl/digest.h>
#else
# include <openssl/evp.h>
#endif
#endif
#include <bit>
@ -318,6 +323,25 @@ struct SHA512Impl
SHA512_Final(out_char_data, &ctx);
}
};
struct SHA512Impl256
{
static constexpr auto name = "SHA512_256";
enum { length = 32 };
static void apply(const char * begin, const size_t size, unsigned char * out_char_data)
{
/// Here, we use the EVP interface that is common to both BoringSSL and OpenSSL. Though BoringSSL is the default
/// SSL library that we use, for S390X architecture only OpenSSL is supported. But the SHA512-256, SHA512_256_Init,
/// SHA512_256_Update, SHA512_256_Final methods to calculate hash (similar to the other SHA functions) aren't available
/// in the current version of OpenSSL that we use which necessitates the use of the EVP interface.
auto md_ctx = EVP_MD_CTX_create();
EVP_DigestInit_ex(md_ctx, EVP_sha512_256(), nullptr /*engine*/);
EVP_DigestUpdate(md_ctx, begin, size);
EVP_DigestFinal_ex(md_ctx, out_char_data, nullptr /*size*/);
EVP_MD_CTX_destroy(md_ctx);
}
};
#endif
struct SipHash64Impl
@ -1801,6 +1825,7 @@ using FunctionSHA224 = FunctionStringHashFixedString<SHA224Impl>;
using FunctionSHA256 = FunctionStringHashFixedString<SHA256Impl>;
using FunctionSHA384 = FunctionStringHashFixedString<SHA384Impl>;
using FunctionSHA512 = FunctionStringHashFixedString<SHA512Impl>;
using FunctionSHA512_256 = FunctionStringHashFixedString<SHA512Impl256>;
#endif
using FunctionSipHash128 = FunctionAnyHash<SipHash128Impl>;
using FunctionSipHash128Keyed = FunctionAnyHash<SipHash128KeyedImpl, true, SipHash128KeyedImpl::Key, SipHash128KeyedImpl::KeyColumns>;

View File

@ -14,14 +14,163 @@ namespace DB
REGISTER_FUNCTION(HashingSSL)
{
factory.registerFunction<FunctionMD4>();
factory.registerFunction<FunctionHalfMD5>();
factory.registerFunction<FunctionMD5>();
factory.registerFunction<FunctionSHA1>();
factory.registerFunction<FunctionSHA224>();
factory.registerFunction<FunctionSHA256>();
factory.registerFunction<FunctionSHA384>();
factory.registerFunction<FunctionSHA512>();
factory.registerFunction<FunctionMD4>(FunctionDocumentation{
.description = R"(Calculates the MD4 hash of the given string.)",
.syntax = "SELECT MD4(s);",
.arguments = {{"s", "The input [String](../../sql-reference/data-types/string.md)."}},
.returned_value
= "The MD4 hash of the given input string returned as a [FixedString(16)](../../sql-reference/data-types/fixedstring.md).",
.examples
= {{"",
"SELECT HEX(MD4('abc'));",
R"(
hex(MD4('abc'))
A448017AAF21D8525FC10AE87AA6729D
)"
}}
});
factory.registerFunction<FunctionHalfMD5>(FunctionDocumentation{
.description = R"(
[Interprets](../..//sql-reference/functions/type-conversion-functions.md/#type_conversion_functions-reinterpretAsString) all the input
parameters as strings and calculates the MD5 hash value for each of them. Then combines hashes, takes the first 8 bytes of the hash of the
resulting string, and interprets them as [UInt64](../../../sql-reference/data-types/int-uint.md) in big-endian byte order. The function is
relatively slow (5 million short strings per second per processor core).
Consider using the [sipHash64](../../sql-reference/functions/hash-functions.md/#hash_functions-siphash64) function instead.
)",
.syntax = "SELECT halfMD5(par1,par2,...,parN);",
.arguments = {{"par1,par2,...,parN",
R"(
The function takes a variable number of input parameters. Arguments can be any of the supported data types. For some data types calculated
value of hash function may be the same for the same values even if types of arguments differ (integers of different size, named and unnamed
Tuple with the same data, Map and the corresponding Array(Tuple(key, value)) type with the same data).
)"
}},
.returned_value
= "The computed half MD5 hash of the given input params returned as a [UInt64](../../../sql-reference/data-types/int-uint.md) in big-endian byte order.",
.examples
= {{"",
"SELECT HEX(halfMD5('abc', 'cde', 'fgh'));",
R"(
hex(halfMD5('abc', 'cde', 'fgh'))
2C9506B7374CFAF4
)"
}}
});
factory.registerFunction<FunctionMD5>(FunctionDocumentation{
.description = R"(Calculates the MD5 hash of the given string.)",
.syntax = "SELECT MD5(s);",
.arguments = {{"s", "The input [String](../../sql-reference/data-types/string.md)."}},
.returned_value
= "The MD5 hash of the given input string returned as a [FixedString(16)](../../sql-reference/data-types/fixedstring.md).",
.examples
= {{"",
"SELECT HEX(MD5('abc'));",
R"(
hex(MD5('abc'))
900150983CD24FB0D6963F7D28E17F72
)"
}}
});
factory.registerFunction<FunctionSHA1>(FunctionDocumentation{
.description = R"(Calculates the SHA1 hash of the given string.)",
.syntax = "SELECT SHA1(s);",
.arguments = {{"s", "The input [String](../../sql-reference/data-types/string.md)."}},
.returned_value
= "The SHA1 hash of the given input string returned as a [FixedString](../../sql-reference/data-types/fixedstring.md).",
.examples
= {{"",
"SELECT HEX(SHA1('abc'));",
R"(
hex(SHA1('abc'))
A9993E364706816ABA3E25717850C26C9CD0D89D
)"
}}
});
factory.registerFunction<FunctionSHA224>(FunctionDocumentation{
.description = R"(Calculates the SHA224 hash of the given string.)",
.syntax = "SELECT SHA224(s);",
.arguments = {{"s", "The input [String](../../sql-reference/data-types/string.md)."}},
.returned_value
= "The SHA224 hash of the given input string returned as a [FixedString](../../sql-reference/data-types/fixedstring.md).",
.examples
= {{"",
"SELECT HEX(SHA224('abc'));",
R"(
hex(SHA224('abc'))
23097D223405D8228642A477BDA255B32AADBCE4BDA0B3F7E36C9DA7
)"
}}
});
factory.registerFunction<FunctionSHA256>(FunctionDocumentation{
.description = R"(Calculates the SHA256 hash of the given string.)",
.syntax = "SELECT SHA256(s);",
.arguments = {{"s", "The input [String](../../sql-reference/data-types/string.md)."}},
.returned_value
= "The SHA256 hash of the given input string returned as a [FixedString](../../sql-reference/data-types/fixedstring.md).",
.examples
= {{"",
"SELECT HEX(SHA256('abc'));",
R"(
hex(SHA256('abc'))
BA7816BF8F01CFEA414140DE5DAE2223B00361A396177A9CB410FF61F20015AD
)"
}}
});
factory.registerFunction<FunctionSHA384>(FunctionDocumentation{
.description = R"(Calculates the SHA384 hash of the given string.)",
.syntax = "SELECT SHA384(s);",
.arguments = {{"s", "The input [String](../../sql-reference/data-types/string.md)."}},
.returned_value
= "The SHA384 hash of the given input string returned as a [FixedString](../../sql-reference/data-types/fixedstring.md).",
.examples
= {{"",
"SELECT HEX(SHA384('abc'));",
R"(
hex(SHA384('abc'))
CB00753F45A35E8BB5A03D699AC65007272C32AB0EDED1631A8B605A43FF5BED8086072BA1E7CC2358BAECA134C825A7
)"
}}
});
factory.registerFunction<FunctionSHA512>(FunctionDocumentation{
.description = R"(Calculates the SHA512 hash of the given string.)",
.syntax = "SELECT SHA512(s);",
.arguments = {{"s", "The input [String](../../sql-reference/data-types/string.md)."}},
.returned_value
= "The SHA512 hash of the given input string returned as a [FixedString](../../sql-reference/data-types/fixedstring.md).",
.examples
= {{"",
"SELECT HEX(SHA512('abc'));",
R"(
hex(SHA512('abc'))
DDAF35A193617ABACC417349AE20413112E6FA4E89A97EA20A9EEEE64B55D39A2192992A274FC1A836BA3C23A3FEEBBD454D4423643CE80E2A9AC94FA54CA49F
)"
}}
});
factory.registerFunction<FunctionSHA512_256>(FunctionDocumentation{
.description = R"(Calculates the SHA512_256 hash of the given string.)",
.syntax = "SELECT SHA512_256(s);",
.arguments = {{"s", "The input [String](../../sql-reference/data-types/string.md)."}},
.returned_value
= "The SHA512_256 hash of the given input string returned as a [FixedString](../../sql-reference/data-types/fixedstring.md).",
.examples
= {{"",
"SELECT HEX(SHA512_256('abc'));",
R"(
hex(SHA512_256('abc'))
53048E2681941EF99B2E29B76B4C7DABE4C2D0C634FC6D46E0E2F13107E7AF23
)"
}}
});
}
}

View File

@ -5,6 +5,10 @@
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionsAES.h>
namespace DB
{
namespace
{
@ -17,9 +21,6 @@ struct DecryptMySQLModeImpl
}
namespace DB
{
REGISTER_FUNCTION(AESDecryptMysql)
{
factory.registerFunction<FunctionDecrypt<DecryptMySQLModeImpl>>();

View File

@ -5,6 +5,9 @@
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionsAES.h>
namespace DB
{
namespace
{
@ -16,9 +19,6 @@ struct EncryptMySQLModeImpl
}
namespace DB
{
REGISTER_FUNCTION(AESEncryptMysql)
{
factory.registerFunction<FunctionEncrypt<EncryptMySQLModeImpl>>();

View File

@ -7,10 +7,10 @@
#include <Functions/GatherUtils/Sinks.h>
#include <Functions/GatherUtils/Sources.h>
#include <Functions/IFunction.h>
#include <Functions/formatString.h>
#include <IO/WriteHelpers.h>
#include <base/map.h>
#include "formatString.h"
namespace DB
{

View File

@ -4,11 +4,11 @@
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionHelpers.h>
#include <Functions/IFunction.h>
#include <Functions/formatString.h>
#include <IO/WriteHelpers.h>
#include <base/map.h>
#include <base/range.h>
#include "formatString.h"
namespace DB
{

View File

@ -47,7 +47,7 @@ public:
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "First argument for function {} must be constant string: "
"name of datepart", getName());
datepart_param = datepart_column->getValue<String>();
datepart_param = Poco::toLower(datepart_column->getValue<String>());
if (datepart_param.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "First argument (name of datepart) for function {} cannot be empty",
getName());

View File

@ -5,6 +5,9 @@
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionsAES.h>
namespace DB
{
namespace
{
@ -17,9 +20,6 @@ struct DecryptImpl
}
namespace DB
{
REGISTER_FUNCTION(Decrypt)
{
factory.registerFunction<FunctionDecrypt<DecryptImpl>>();

View File

@ -5,6 +5,9 @@
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionsAES.h>
namespace DB
{
namespace
{
@ -16,9 +19,6 @@ struct EncryptImpl
}
namespace DB
{
REGISTER_FUNCTION(Encrypt)
{
factory.registerFunction<FunctionEncrypt<EncryptImpl>>();

View File

@ -1,35 +1,33 @@
#include <Columns/ColumnFixedString.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnStringHelpers.h>
#include <DataTypes/DataTypeString.h>
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionHelpers.h>
#include <Functions/IFunction.h>
#include <Functions/formatString.h>
#include <IO/WriteHelpers.h>
#include <base/range.h>
#include <memory>
#include <string>
#include <vector>
#include "formatString.h"
namespace DB
{
namespace ErrorCodes
{
extern const int ILLEGAL_COLUMN;
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
namespace
{
template <typename Name>
class FormatFunction : public IFunction
{
public:
static constexpr auto name = Name::name;
static constexpr auto name = "format";
static FunctionPtr create(ContextPtr) { return std::make_shared<FormatFunction>(); }
@ -52,18 +50,6 @@ public:
getName(),
arguments.size());
for (const auto arg_idx : collections::range(0, arguments.size()))
{
const auto * arg = arguments[arg_idx].get();
if (!isStringOrFixedString(arg))
throw Exception(
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Illegal type {} of argument {} of function {}",
arg->getName(),
arg_idx + 1,
getName());
}
return std::make_shared<DataTypeString>();
}
@ -83,6 +69,7 @@ public:
std::vector<const ColumnString::Offsets *> offsets(arguments.size() - 1);
std::vector<size_t> fixed_string_sizes(arguments.size() - 1);
std::vector<std::optional<String>> constant_strings(arguments.size() - 1);
std::vector<ColumnString::MutablePtr> converted_col_ptrs(arguments.size() - 1);
bool has_column_string = false;
bool has_column_fixed_string = false;
@ -106,8 +93,29 @@ public:
constant_strings[i - 1] = const_col->getValue<String>();
}
else
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Illegal column {} of argument of function {}",
column->getName(), getName());
{
/// A non-String/non-FixedString-type argument: use the default serialization to convert it to String
auto full_column = column->convertToFullIfNeeded();
auto serialization = arguments[i].type->getDefaultSerialization();
auto converted_col_str = ColumnString::create();
ColumnStringHelpers::WriteHelper write_helper(*converted_col_str, column->size());
auto & write_buffer = write_helper.getWriteBuffer();
FormatSettings format_settings;
for (size_t row = 0; row < column->size(); ++row)
{
serialization->serializeText(*full_column, row, write_buffer, format_settings);
write_helper.rowWritten();
}
write_helper.finalize();
/// Same as the normal `ColumnString` branch
has_column_string = true;
data[i - 1] = &converted_col_str->getChars();
offsets[i - 1] = &converted_col_str->getOffsets();
/// Keep the pointer alive
converted_col_ptrs[i - 1] = std::move(converted_col_str);
}
}
FormatStringImpl::formatExecute(
@ -127,11 +135,7 @@ public:
};
struct NameFormat
{
static constexpr auto name = "format";
};
using FunctionFormat = FormatFunction<NameFormat>;
using FunctionFormat = FormatFunction;
}

View File

@ -21,6 +21,7 @@ namespace
REGISTER_FUNCTION(FormatReadableSize)
{
factory.registerFunction<FunctionFormatReadable<Impl>>();
factory.registerAlias("FORMAT_BYTES", Impl::name, FunctionFactory::CaseInsensitive);
}
}

View File

@ -3,8 +3,12 @@
#if USE_SSL
# include <Functions/FunctionFactory.h>
# include <Functions/FunctionsAES.h>
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionsAES.h>
namespace DB
{
namespace
{
@ -18,9 +22,6 @@ struct TryDecryptImpl
}
namespace DB
{
REGISTER_FUNCTION(TryDecrypt)
{
factory.registerFunction<FunctionDecrypt<TryDecryptImpl>>(FunctionDocumentation{

View File

@ -97,7 +97,7 @@ namespace
uint8_t * ciphertext = reinterpret_cast<uint8_t *>(out.position());
int ciphertext_size = 0;
if (!EVP_EncryptUpdate(evp_ctx, ciphertext, &ciphertext_size, &in[in_size], static_cast<int>(part_size)))
throw Exception(ErrorCodes::DATA_ENCRYPTION_ERROR, "Failed to encrypt");
throw Exception(ErrorCodes::DATA_ENCRYPTION_ERROR, "Failed to encrypt: {}", ERR_get_error());
in_size += part_size;
if (ciphertext_size)
@ -120,7 +120,7 @@ namespace
uint8_t ciphertext[kBlockSize];
int ciphertext_size = 0;
if (!EVP_EncryptUpdate(evp_ctx, ciphertext, &ciphertext_size, padded_data, safe_cast<int>(padded_data_size)))
throw Exception(ErrorCodes::DATA_ENCRYPTION_ERROR, "Failed to encrypt");
throw Exception(ErrorCodes::DATA_ENCRYPTION_ERROR, "Failed to encrypt: {}", ERR_get_error());
if (!ciphertext_size)
return 0;
@ -140,7 +140,7 @@ namespace
int ciphertext_size = 0;
if (!EVP_EncryptFinal_ex(evp_ctx,
ciphertext, &ciphertext_size))
throw Exception(ErrorCodes::DATA_ENCRYPTION_ERROR, "Failed to finalize encrypting");
throw Exception(ErrorCodes::DATA_ENCRYPTION_ERROR, "Failed to finalize encrypting: {}", ERR_get_error());
if (ciphertext_size)
out.write(reinterpret_cast<const char *>(ciphertext), ciphertext_size);
return ciphertext_size;
@ -152,7 +152,7 @@ namespace
uint8_t * plaintext = reinterpret_cast<uint8_t *>(out);
int plaintext_size = 0;
if (!EVP_DecryptUpdate(evp_ctx, plaintext, &plaintext_size, in, safe_cast<int>(size)))
throw Exception(ErrorCodes::DATA_ENCRYPTION_ERROR, "Failed to decrypt");
throw Exception(ErrorCodes::DATA_ENCRYPTION_ERROR, "Failed to decrypt: {}", ERR_get_error());
return plaintext_size;
}
@ -165,7 +165,7 @@ namespace
uint8_t plaintext[kBlockSize];
int plaintext_size = 0;
if (!EVP_DecryptUpdate(evp_ctx, plaintext, &plaintext_size, padded_data, safe_cast<int>(padded_data_size)))
throw Exception(ErrorCodes::DATA_ENCRYPTION_ERROR, "Failed to decrypt");
throw Exception(ErrorCodes::DATA_ENCRYPTION_ERROR, "Failed to decrypt: {}", ERR_get_error());
if (!plaintext_size)
return 0;
@ -184,7 +184,7 @@ namespace
uint8_t plaintext[kBlockSize];
int plaintext_size = 0;
if (!EVP_DecryptFinal_ex(evp_ctx, plaintext, &plaintext_size))
throw Exception(ErrorCodes::DATA_ENCRYPTION_ERROR, "Failed to finalize decrypting");
throw Exception(ErrorCodes::DATA_ENCRYPTION_ERROR, "Failed to finalize decrypting: {}", ERR_get_error());
if (plaintext_size)
memcpy(out, plaintext, plaintext_size);
return plaintext_size;
@ -291,11 +291,11 @@ void Encryptor::encrypt(const char * data, size_t size, WriteBuffer & out)
auto * evp_ctx = evp_ctx_ptr.get();
if (!EVP_EncryptInit_ex(evp_ctx, evp_cipher, nullptr, nullptr, nullptr))
throw Exception(ErrorCodes::DATA_ENCRYPTION_ERROR, "Failed to initialize encryption context with cipher");
throw Exception(ErrorCodes::DATA_ENCRYPTION_ERROR, "Failed to initialize encryption context with cipher: {}", ERR_get_error());
if (!EVP_EncryptInit_ex(evp_ctx, nullptr, nullptr,
reinterpret_cast<const uint8_t*>(key.c_str()), reinterpret_cast<const uint8_t*>(current_iv.c_str())))
throw Exception(ErrorCodes::DATA_ENCRYPTION_ERROR, "Failed to set key and IV for encryption");
throw Exception(ErrorCodes::DATA_ENCRYPTION_ERROR, "Failed to set key and IV for encryption: {}", ERR_get_error());
size_t in_size = 0;
size_t out_size = 0;
@ -320,7 +320,7 @@ void Encryptor::encrypt(const char * data, size_t size, WriteBuffer & out)
out_size += encryptFinal(evp_ctx, out);
if (out_size != in_size)
throw Exception(ErrorCodes::DATA_ENCRYPTION_ERROR, "Only part of the data was encrypted");
throw Exception(ErrorCodes::DATA_ENCRYPTION_ERROR, "Only part of the data was encrypted: {} out of {} bytes", out_size, in_size);
offset += in_size;
}
@ -335,11 +335,11 @@ void Encryptor::decrypt(const char * data, size_t size, char * out)
auto * evp_ctx = evp_ctx_ptr.get();
if (!EVP_DecryptInit_ex(evp_ctx, evp_cipher, nullptr, nullptr, nullptr))
throw Exception(ErrorCodes::DATA_ENCRYPTION_ERROR, "Failed to initialize decryption context with cipher");
throw Exception(ErrorCodes::DATA_ENCRYPTION_ERROR, "Failed to initialize decryption context with cipher: {}", ERR_get_error());
if (!EVP_DecryptInit_ex(evp_ctx, nullptr, nullptr,
reinterpret_cast<const uint8_t*>(key.c_str()), reinterpret_cast<const uint8_t*>(current_iv.c_str())))
throw Exception(ErrorCodes::DATA_ENCRYPTION_ERROR, "Failed to set key and IV for decryption");
throw Exception(ErrorCodes::DATA_ENCRYPTION_ERROR, "Failed to set key and IV for decryption: {}", ERR_get_error());
size_t in_size = 0;
size_t out_size = 0;
@ -364,7 +364,7 @@ void Encryptor::decrypt(const char * data, size_t size, char * out)
out_size += decryptFinal(evp_ctx, &out[out_size]);
if (out_size != in_size)
throw Exception(ErrorCodes::DATA_ENCRYPTION_ERROR, "Only part of the data was decrypted");
throw Exception(ErrorCodes::DATA_ENCRYPTION_ERROR, "Only part of the data was decrypted: {} out of {} bytes", out_size, in_size);
offset += in_size;
}

View File

@ -39,7 +39,6 @@ namespace ErrorCodes
extern const int CANNOT_READ_FROM_FILE_DESCRIPTOR;
extern const int ARGUMENT_OUT_OF_BOUND;
extern const int CANNOT_SEEK_THROUGH_FILE;
extern const int CANNOT_SELECT;
extern const int CANNOT_ADVISE;
}
@ -249,24 +248,6 @@ void ReadBufferFromFileDescriptor::rewind()
file_offset_of_buffer_end = 0;
}
/// Assuming file descriptor supports 'select', check that we have data to read or wait until timeout.
bool ReadBufferFromFileDescriptor::poll(size_t timeout_microseconds) const
{
fd_set fds;
FD_ZERO(&fds);
FD_SET(fd, &fds);
timeval timeout = { time_t(timeout_microseconds / 1000000), suseconds_t(timeout_microseconds % 1000000) };
int res = select(1, &fds, nullptr, nullptr, &timeout);
if (-1 == res)
throwFromErrno("Cannot select", ErrorCodes::CANNOT_SELECT);
return res > 0;
}
size_t ReadBufferFromFileDescriptor::getFileSize()
{
return getSizeFromFileDescriptor(fd, getFileName());

View File

@ -75,10 +75,6 @@ public:
size_t readBigAt(char * to, size_t n, size_t offset, const std::function<bool(size_t)> &) override;
bool supportsReadAt() override { return use_pread; }
private:
/// Assuming file descriptor supports 'select', check that we have data to read or wait until timeout.
bool poll(size_t timeout_microseconds) const;
};

View File

@ -35,6 +35,7 @@
#include <Disks/ObjectStorages/DiskObjectStorage.h>
#include <Disks/ObjectStorages/IObjectStorage.h>
#include <Disks/StoragePolicy.h>
#include <Disks/IO/IOUringReader.h>
#include <IO/SynchronousReader.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <Interpreters/ActionLocksManager.h>
@ -309,6 +310,11 @@ struct ContextSharedPart : boost::noncopyable
mutable OnceFlag threadpool_writer_initialized;
mutable std::unique_ptr<ThreadPool> threadpool_writer;
#if USE_LIBURING
mutable OnceFlag io_uring_reader_initialized;
mutable std::unique_ptr<IOUringReader> io_uring_reader;
#endif
mutable ThrottlerPtr replicated_fetches_throttler; /// A server-wide throttler for replicated fetches
mutable ThrottlerPtr replicated_sends_throttler; /// A server-wide throttler for replicated sends
@ -367,6 +373,7 @@ struct ContextSharedPart : boost::noncopyable
std::shared_ptr<Clusters> clusters TSA_GUARDED_BY(clusters_mutex);
ConfigurationPtr clusters_config TSA_GUARDED_BY(clusters_mutex); /// Stores updated configs
std::unique_ptr<ClusterDiscovery> cluster_discovery TSA_GUARDED_BY(clusters_mutex);
size_t clusters_version TSA_GUARDED_BY(clusters_mutex) = 0;
/// No lock required for async_insert_queue modified only during initialization
std::shared_ptr<AsynchronousInsertQueue> async_insert_queue;
@ -3523,6 +3530,14 @@ void Context::setClustersConfig(const ConfigurationPtr & config, bool enable_dis
shared->clusters = std::make_shared<Clusters>(*shared->clusters_config, settings, getMacros(), config_name);
else
shared->clusters->updateClusters(*shared->clusters_config, settings, config_name, old_clusters_config);
++shared->clusters_version;
}
size_t Context::getClustersVersion() const
{
std::lock_guard lock(shared->clusters_mutex);
return shared->clusters_version;
}
@ -4802,7 +4817,7 @@ void Context::initializeBackgroundExecutorsIfNeeded()
shared->are_background_executors_initialized = true;
}
bool Context::areBackgroundExecutorsInitialized()
bool Context::areBackgroundExecutorsInitialized() const
{
SharedLockGuard lock(shared->background_executors_mutex);
return shared->are_background_executors_initialized;
@ -4852,6 +4867,17 @@ IAsynchronousReader & Context::getThreadPoolReader(FilesystemReaderType type) co
}
}
#if USE_LIBURING
IOUringReader & Context::getIOURingReader() const
{
callOnce(shared->io_uring_reader_initialized, [&] {
shared->io_uring_reader = std::make_unique<IOUringReader>(512);
});
return *shared->io_uring_reader;
}
#endif
ThreadPool & Context::getThreadPoolWriter() const
{
callOnce(shared->threadpool_writer_initialized, [&] {

Some files were not shown because too many files have changed in this diff Show More