mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-28 02:21:59 +00:00
Merge branch 'master' into rschu1ze-azure-darwin2
This commit is contained in:
commit
3d03a44f15
6
.github/workflows/master.yml
vendored
6
.github/workflows/master.yml
vendored
@ -205,6 +205,12 @@ jobs:
|
|||||||
with:
|
with:
|
||||||
build_name: binary_amd64_compat
|
build_name: binary_amd64_compat
|
||||||
checkout_depth: 0
|
checkout_depth: 0
|
||||||
|
BuilderBinAmd64Musl:
|
||||||
|
needs: [DockerHubPush]
|
||||||
|
uses: ./.github/workflows/reusable_build.yml
|
||||||
|
with:
|
||||||
|
build_name: binary_amd64_musl
|
||||||
|
checkout_depth: 0
|
||||||
BuilderBinAarch64V80Compat:
|
BuilderBinAarch64V80Compat:
|
||||||
needs: [DockerHubPush]
|
needs: [DockerHubPush]
|
||||||
uses: ./.github/workflows/reusable_build.yml
|
uses: ./.github/workflows/reusable_build.yml
|
||||||
|
5
.github/workflows/pull_request.yml
vendored
5
.github/workflows/pull_request.yml
vendored
@ -242,6 +242,11 @@ jobs:
|
|||||||
uses: ./.github/workflows/reusable_build.yml
|
uses: ./.github/workflows/reusable_build.yml
|
||||||
with:
|
with:
|
||||||
build_name: binary_amd64_compat
|
build_name: binary_amd64_compat
|
||||||
|
BuilderBinAmd64Musl:
|
||||||
|
needs: [FastTest, StyleCheck]
|
||||||
|
uses: ./.github/workflows/reusable_build.yml
|
||||||
|
with:
|
||||||
|
build_name: binary_amd64_musl
|
||||||
BuilderBinAarch64V80Compat:
|
BuilderBinAarch64V80Compat:
|
||||||
needs: [FastTest, StyleCheck]
|
needs: [FastTest, StyleCheck]
|
||||||
uses: ./.github/workflows/reusable_build.yml
|
uses: ./.github/workflows/reusable_build.yml
|
||||||
|
@ -30,7 +30,6 @@ int __gai_sigqueue(int sig, const union sigval val, pid_t caller_pid)
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
#include <sys/select.h>
|
|
||||||
#include <stdlib.h>
|
#include <stdlib.h>
|
||||||
#include <features.h>
|
#include <features.h>
|
||||||
|
|
||||||
|
@ -55,7 +55,6 @@ set (SRCS
|
|||||||
src/DigestStream.cpp
|
src/DigestStream.cpp
|
||||||
src/DirectoryIterator.cpp
|
src/DirectoryIterator.cpp
|
||||||
src/DirectoryIteratorStrategy.cpp
|
src/DirectoryIteratorStrategy.cpp
|
||||||
src/DirectoryWatcher.cpp
|
|
||||||
src/Environment.cpp
|
src/Environment.cpp
|
||||||
src/Error.cpp
|
src/Error.cpp
|
||||||
src/ErrorHandler.cpp
|
src/ErrorHandler.cpp
|
||||||
|
@ -1,228 +0,0 @@
|
|||||||
//
|
|
||||||
// DirectoryWatcher.h
|
|
||||||
//
|
|
||||||
// Library: Foundation
|
|
||||||
// Package: Filesystem
|
|
||||||
// Module: DirectoryWatcher
|
|
||||||
//
|
|
||||||
// Definition of the DirectoryWatcher class.
|
|
||||||
//
|
|
||||||
// Copyright (c) 2012, Applied Informatics Software Engineering GmbH.
|
|
||||||
// and Contributors.
|
|
||||||
//
|
|
||||||
// SPDX-License-Identifier: BSL-1.0
|
|
||||||
//
|
|
||||||
|
|
||||||
|
|
||||||
#ifndef Foundation_DirectoryWatcher_INCLUDED
|
|
||||||
#define Foundation_DirectoryWatcher_INCLUDED
|
|
||||||
|
|
||||||
|
|
||||||
#include "Poco/Foundation.h"
|
|
||||||
|
|
||||||
|
|
||||||
#ifndef POCO_NO_INOTIFY
|
|
||||||
|
|
||||||
|
|
||||||
# include "Poco/AtomicCounter.h"
|
|
||||||
# include "Poco/BasicEvent.h"
|
|
||||||
# include "Poco/File.h"
|
|
||||||
# include "Poco/Runnable.h"
|
|
||||||
# include "Poco/Thread.h"
|
|
||||||
|
|
||||||
|
|
||||||
namespace Poco
|
|
||||||
{
|
|
||||||
|
|
||||||
|
|
||||||
class DirectoryWatcherStrategy;
|
|
||||||
|
|
||||||
|
|
||||||
class Foundation_API DirectoryWatcher : protected Runnable
|
|
||||||
/// This class is used to get notifications about changes
|
|
||||||
/// to the filesystem, more specifically, to a specific
|
|
||||||
/// directory. Changes to a directory are reported via
|
|
||||||
/// events.
|
|
||||||
///
|
|
||||||
/// A thread will be created that watches the specified
|
|
||||||
/// directory for changes. Events are reported in the context
|
|
||||||
/// of this thread.
|
|
||||||
///
|
|
||||||
/// Note that changes to files in subdirectories of the watched
|
|
||||||
/// directory are not reported. Separate DirectoryWatcher objects
|
|
||||||
/// must be created for these directories if they should be watched.
|
|
||||||
///
|
|
||||||
/// Changes to file attributes are not reported.
|
|
||||||
///
|
|
||||||
/// On Windows, this class is implemented using FindFirstChangeNotification()/FindNextChangeNotification().
|
|
||||||
/// On Linux, this class is implemented using inotify.
|
|
||||||
/// On FreeBSD and Darwin (Mac OS X, iOS), this class uses kevent/kqueue.
|
|
||||||
/// On all other platforms, the watched directory is periodically scanned
|
|
||||||
/// for changes. This can negatively affect performance if done too often.
|
|
||||||
/// Therefore, the interval in which scans are done can be specified in
|
|
||||||
/// the constructor. Note that periodic scanning will also be done on FreeBSD
|
|
||||||
/// and Darwin if events for changes to files (DW_ITEM_MODIFIED) are enabled.
|
|
||||||
///
|
|
||||||
/// DW_ITEM_MOVED_FROM and DW_ITEM_MOVED_TO events will only be reported
|
|
||||||
/// on Linux. On other platforms, a file rename or move operation
|
|
||||||
/// will be reported via a DW_ITEM_REMOVED and a DW_ITEM_ADDED event.
|
|
||||||
/// The order of these two events is not defined.
|
|
||||||
///
|
|
||||||
/// An event mask can be specified to enable only certain events.
|
|
||||||
{
|
|
||||||
public:
|
|
||||||
enum DirectoryEventType
|
|
||||||
{
|
|
||||||
DW_ITEM_ADDED = 1,
|
|
||||||
/// A new item has been created and added to the directory.
|
|
||||||
|
|
||||||
DW_ITEM_REMOVED = 2,
|
|
||||||
/// An item has been removed from the directory.
|
|
||||||
|
|
||||||
DW_ITEM_MODIFIED = 4,
|
|
||||||
/// An item has been modified.
|
|
||||||
|
|
||||||
DW_ITEM_MOVED_FROM = 8,
|
|
||||||
/// An item has been renamed or moved. This event delivers the old name.
|
|
||||||
|
|
||||||
DW_ITEM_MOVED_TO = 16
|
|
||||||
/// An item has been renamed or moved. This event delivers the new name.
|
|
||||||
};
|
|
||||||
|
|
||||||
enum DirectoryEventMask
|
|
||||||
{
|
|
||||||
DW_FILTER_ENABLE_ALL = 31,
|
|
||||||
/// Enables all event types.
|
|
||||||
|
|
||||||
DW_FILTER_DISABLE_ALL = 0
|
|
||||||
/// Disables all event types.
|
|
||||||
};
|
|
||||||
|
|
||||||
enum
|
|
||||||
{
|
|
||||||
DW_DEFAULT_SCAN_INTERVAL = 5 /// Default scan interval for platforms that don't provide a native notification mechanism.
|
|
||||||
};
|
|
||||||
|
|
||||||
struct DirectoryEvent
|
|
||||||
{
|
|
||||||
DirectoryEvent(const File & f, DirectoryEventType ev) : item(f), event(ev) { }
|
|
||||||
|
|
||||||
const File & item; /// The directory or file that has been changed.
|
|
||||||
DirectoryEventType event; /// The kind of event.
|
|
||||||
};
|
|
||||||
|
|
||||||
BasicEvent<const DirectoryEvent> itemAdded;
|
|
||||||
/// Fired when a file or directory has been created or added to the directory.
|
|
||||||
|
|
||||||
BasicEvent<const DirectoryEvent> itemRemoved;
|
|
||||||
/// Fired when a file or directory has been removed from the directory.
|
|
||||||
|
|
||||||
BasicEvent<const DirectoryEvent> itemModified;
|
|
||||||
/// Fired when a file or directory has been modified.
|
|
||||||
|
|
||||||
BasicEvent<const DirectoryEvent> itemMovedFrom;
|
|
||||||
/// Fired when a file or directory has been renamed. This event delivers the old name.
|
|
||||||
|
|
||||||
BasicEvent<const DirectoryEvent> itemMovedTo;
|
|
||||||
/// Fired when a file or directory has been moved. This event delivers the new name.
|
|
||||||
|
|
||||||
BasicEvent<const Exception> scanError;
|
|
||||||
/// Fired when an error occurs while scanning for changes.
|
|
||||||
|
|
||||||
DirectoryWatcher(const std::string & path, int eventMask = DW_FILTER_ENABLE_ALL, int scanInterval = DW_DEFAULT_SCAN_INTERVAL);
|
|
||||||
/// Creates a DirectoryWatcher for the directory given in path.
|
|
||||||
/// To enable only specific events, an eventMask can be specified by
|
|
||||||
/// OR-ing the desired event IDs (e.g., DW_ITEM_ADDED | DW_ITEM_MODIFIED).
|
|
||||||
/// On platforms where no native filesystem notifications are available,
|
|
||||||
/// scanInterval specifies the interval in seconds between scans
|
|
||||||
/// of the directory.
|
|
||||||
|
|
||||||
DirectoryWatcher(const File & directory, int eventMask = DW_FILTER_ENABLE_ALL, int scanInterval = DW_DEFAULT_SCAN_INTERVAL);
|
|
||||||
/// Creates a DirectoryWatcher for the specified directory
|
|
||||||
/// To enable only specific events, an eventMask can be specified by
|
|
||||||
/// OR-ing the desired event IDs (e.g., DW_ITEM_ADDED | DW_ITEM_MODIFIED).
|
|
||||||
/// On platforms where no native filesystem notifications are available,
|
|
||||||
/// scanInterval specifies the interval in seconds between scans
|
|
||||||
/// of the directory.
|
|
||||||
|
|
||||||
~DirectoryWatcher();
|
|
||||||
/// Destroys the DirectoryWatcher.
|
|
||||||
|
|
||||||
void suspendEvents();
|
|
||||||
/// Suspends sending of events. Can be called multiple times, but every
|
|
||||||
/// call to suspendEvent() must be matched by a call to resumeEvents().
|
|
||||||
|
|
||||||
void resumeEvents();
|
|
||||||
/// Resumes events, after they have been suspended with a call to suspendEvents().
|
|
||||||
|
|
||||||
bool eventsSuspended() const;
|
|
||||||
/// Returns true iff events are suspended.
|
|
||||||
|
|
||||||
int eventMask() const;
|
|
||||||
/// Returns the value of the eventMask passed to the constructor.
|
|
||||||
|
|
||||||
int scanInterval() const;
|
|
||||||
/// Returns the scan interval in seconds.
|
|
||||||
|
|
||||||
const File & directory() const;
|
|
||||||
/// Returns the directory being watched.
|
|
||||||
|
|
||||||
bool supportsMoveEvents() const;
|
|
||||||
/// Returns true iff the platform supports DW_ITEM_MOVED_FROM/itemMovedFrom and
|
|
||||||
/// DW_ITEM_MOVED_TO/itemMovedTo events.
|
|
||||||
|
|
||||||
protected:
|
|
||||||
void init();
|
|
||||||
void stop();
|
|
||||||
void run();
|
|
||||||
|
|
||||||
private:
|
|
||||||
DirectoryWatcher();
|
|
||||||
DirectoryWatcher(const DirectoryWatcher &);
|
|
||||||
DirectoryWatcher & operator=(const DirectoryWatcher &);
|
|
||||||
|
|
||||||
Thread _thread;
|
|
||||||
File _directory;
|
|
||||||
int _eventMask;
|
|
||||||
AtomicCounter _eventsSuspended;
|
|
||||||
int _scanInterval;
|
|
||||||
DirectoryWatcherStrategy * _pStrategy;
|
|
||||||
};
|
|
||||||
|
|
||||||
|
|
||||||
//
|
|
||||||
// inlines
|
|
||||||
//
|
|
||||||
|
|
||||||
|
|
||||||
inline bool DirectoryWatcher::eventsSuspended() const
|
|
||||||
{
|
|
||||||
return _eventsSuspended.value() > 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
inline int DirectoryWatcher::eventMask() const
|
|
||||||
{
|
|
||||||
return _eventMask;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
inline int DirectoryWatcher::scanInterval() const
|
|
||||||
{
|
|
||||||
return _scanInterval;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
inline const File & DirectoryWatcher::directory() const
|
|
||||||
{
|
|
||||||
return _directory;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
} // namespace Poco
|
|
||||||
|
|
||||||
|
|
||||||
#endif // POCO_NO_INOTIFY
|
|
||||||
|
|
||||||
|
|
||||||
#endif // Foundation_DirectoryWatcher_INCLUDED
|
|
@ -1,602 +0,0 @@
|
|||||||
//
|
|
||||||
// DirectoryWatcher.cpp
|
|
||||||
//
|
|
||||||
// Library: Foundation
|
|
||||||
// Package: Filesystem
|
|
||||||
// Module: DirectoryWatcher
|
|
||||||
//
|
|
||||||
// Copyright (c) 2012, Applied Informatics Software Engineering GmbH.
|
|
||||||
// and Contributors.
|
|
||||||
//
|
|
||||||
// SPDX-License-Identifier: BSL-1.0
|
|
||||||
//
|
|
||||||
|
|
||||||
|
|
||||||
#include "Poco/DirectoryWatcher.h"
|
|
||||||
|
|
||||||
|
|
||||||
#ifndef POCO_NO_INOTIFY
|
|
||||||
|
|
||||||
|
|
||||||
#include "Poco/Path.h"
|
|
||||||
#include "Poco/Glob.h"
|
|
||||||
#include "Poco/DirectoryIterator.h"
|
|
||||||
#include "Poco/Event.h"
|
|
||||||
#include "Poco/Exception.h"
|
|
||||||
#include "Poco/Buffer.h"
|
|
||||||
#if POCO_OS == POCO_OS_LINUX || POCO_OS == POCO_OS_ANDROID
|
|
||||||
#include <sys/inotify.h>
|
|
||||||
#include <sys/select.h>
|
|
||||||
#include <unistd.h>
|
|
||||||
#elif POCO_OS == POCO_OS_MAC_OS_X || POCO_OS == POCO_OS_FREE_BSD
|
|
||||||
#include <fcntl.h>
|
|
||||||
#include <sys/types.h>
|
|
||||||
#include <sys/event.h>
|
|
||||||
#include <sys/time.h>
|
|
||||||
#include <unistd.h>
|
|
||||||
#if (POCO_OS == POCO_OS_FREE_BSD) && !defined(O_EVTONLY)
|
|
||||||
#define O_EVTONLY 0x8000
|
|
||||||
#endif
|
|
||||||
#endif
|
|
||||||
#include <algorithm>
|
|
||||||
#include <atomic>
|
|
||||||
#include <map>
|
|
||||||
|
|
||||||
namespace Poco {
|
|
||||||
|
|
||||||
|
|
||||||
class DirectoryWatcherStrategy
|
|
||||||
{
|
|
||||||
public:
|
|
||||||
DirectoryWatcherStrategy(DirectoryWatcher& owner):
|
|
||||||
_owner(owner)
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
virtual ~DirectoryWatcherStrategy()
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
DirectoryWatcher& owner()
|
|
||||||
{
|
|
||||||
return _owner;
|
|
||||||
}
|
|
||||||
|
|
||||||
virtual void run() = 0;
|
|
||||||
virtual void stop() = 0;
|
|
||||||
virtual bool supportsMoveEvents() const = 0;
|
|
||||||
|
|
||||||
protected:
|
|
||||||
struct ItemInfo
|
|
||||||
{
|
|
||||||
ItemInfo():
|
|
||||||
size(0)
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
ItemInfo(const ItemInfo& other):
|
|
||||||
path(other.path),
|
|
||||||
size(other.size),
|
|
||||||
lastModified(other.lastModified)
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
explicit ItemInfo(const File& f):
|
|
||||||
path(f.path()),
|
|
||||||
size(f.isFile() ? f.getSize() : 0),
|
|
||||||
lastModified(f.getLastModified())
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
std::string path;
|
|
||||||
File::FileSize size;
|
|
||||||
Timestamp lastModified;
|
|
||||||
};
|
|
||||||
typedef std::map<std::string, ItemInfo> ItemInfoMap;
|
|
||||||
|
|
||||||
void scan(ItemInfoMap& entries)
|
|
||||||
{
|
|
||||||
DirectoryIterator it(owner().directory());
|
|
||||||
DirectoryIterator end;
|
|
||||||
while (it != end)
|
|
||||||
{
|
|
||||||
entries[it.path().getFileName()] = ItemInfo(*it);
|
|
||||||
++it;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void compare(ItemInfoMap& oldEntries, ItemInfoMap& newEntries)
|
|
||||||
{
|
|
||||||
for (ItemInfoMap::iterator itn = newEntries.begin(); itn != newEntries.end(); ++itn)
|
|
||||||
{
|
|
||||||
ItemInfoMap::iterator ito = oldEntries.find(itn->first);
|
|
||||||
if (ito != oldEntries.end())
|
|
||||||
{
|
|
||||||
if ((owner().eventMask() & DirectoryWatcher::DW_ITEM_MODIFIED) && !owner().eventsSuspended())
|
|
||||||
{
|
|
||||||
if (itn->second.size != ito->second.size || itn->second.lastModified != ito->second.lastModified)
|
|
||||||
{
|
|
||||||
Poco::File f(itn->second.path);
|
|
||||||
DirectoryWatcher::DirectoryEvent ev(f, DirectoryWatcher::DW_ITEM_MODIFIED);
|
|
||||||
owner().itemModified(&owner(), ev);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
oldEntries.erase(ito);
|
|
||||||
}
|
|
||||||
else if ((owner().eventMask() & DirectoryWatcher::DW_ITEM_ADDED) && !owner().eventsSuspended())
|
|
||||||
{
|
|
||||||
Poco::File f(itn->second.path);
|
|
||||||
DirectoryWatcher::DirectoryEvent ev(f, DirectoryWatcher::DW_ITEM_ADDED);
|
|
||||||
owner().itemAdded(&owner(), ev);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if ((owner().eventMask() & DirectoryWatcher::DW_ITEM_REMOVED) && !owner().eventsSuspended())
|
|
||||||
{
|
|
||||||
for (ItemInfoMap::iterator it = oldEntries.begin(); it != oldEntries.end(); ++it)
|
|
||||||
{
|
|
||||||
Poco::File f(it->second.path);
|
|
||||||
DirectoryWatcher::DirectoryEvent ev(f, DirectoryWatcher::DW_ITEM_REMOVED);
|
|
||||||
owner().itemRemoved(&owner(), ev);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private:
|
|
||||||
DirectoryWatcherStrategy();
|
|
||||||
DirectoryWatcherStrategy(const DirectoryWatcherStrategy&);
|
|
||||||
DirectoryWatcherStrategy& operator = (const DirectoryWatcherStrategy&);
|
|
||||||
|
|
||||||
DirectoryWatcher& _owner;
|
|
||||||
};
|
|
||||||
|
|
||||||
|
|
||||||
#if POCO_OS == POCO_OS_WINDOWS_NT
|
|
||||||
|
|
||||||
|
|
||||||
class WindowsDirectoryWatcherStrategy: public DirectoryWatcherStrategy
|
|
||||||
{
|
|
||||||
public:
|
|
||||||
WindowsDirectoryWatcherStrategy(DirectoryWatcher& owner):
|
|
||||||
DirectoryWatcherStrategy(owner)
|
|
||||||
{
|
|
||||||
_hStopped = CreateEventW(NULL, FALSE, FALSE, NULL);
|
|
||||||
if (!_hStopped)
|
|
||||||
throw SystemException("cannot create event");
|
|
||||||
}
|
|
||||||
|
|
||||||
~WindowsDirectoryWatcherStrategy()
|
|
||||||
{
|
|
||||||
CloseHandle(_hStopped);
|
|
||||||
}
|
|
||||||
|
|
||||||
void run()
|
|
||||||
{
|
|
||||||
ItemInfoMap entries;
|
|
||||||
scan(entries);
|
|
||||||
|
|
||||||
DWORD filter = FILE_NOTIFY_CHANGE_FILE_NAME | FILE_NOTIFY_CHANGE_DIR_NAME;
|
|
||||||
if (owner().eventMask() & DirectoryWatcher::DW_ITEM_MODIFIED)
|
|
||||||
filter |= FILE_NOTIFY_CHANGE_SIZE | FILE_NOTIFY_CHANGE_LAST_WRITE;
|
|
||||||
|
|
||||||
std::string path(owner().directory().path());
|
|
||||||
HANDLE hChange = FindFirstChangeNotificationA(path.c_str(), FALSE, filter);
|
|
||||||
|
|
||||||
if (hChange == INVALID_HANDLE_VALUE)
|
|
||||||
{
|
|
||||||
try
|
|
||||||
{
|
|
||||||
FileImpl::handleLastErrorImpl(path);
|
|
||||||
}
|
|
||||||
catch (Poco::Exception& exc)
|
|
||||||
{
|
|
||||||
owner().scanError(&owner(), exc);
|
|
||||||
}
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool stopped = false;
|
|
||||||
while (!stopped)
|
|
||||||
{
|
|
||||||
try
|
|
||||||
{
|
|
||||||
HANDLE h[2];
|
|
||||||
h[0] = _hStopped;
|
|
||||||
h[1] = hChange;
|
|
||||||
switch (WaitForMultipleObjects(2, h, FALSE, INFINITE))
|
|
||||||
{
|
|
||||||
case WAIT_OBJECT_0:
|
|
||||||
stopped = true;
|
|
||||||
break;
|
|
||||||
case WAIT_OBJECT_0 + 1:
|
|
||||||
{
|
|
||||||
ItemInfoMap newEntries;
|
|
||||||
scan(newEntries);
|
|
||||||
compare(entries, newEntries);
|
|
||||||
std::swap(entries, newEntries);
|
|
||||||
if (FindNextChangeNotification(hChange) == FALSE)
|
|
||||||
{
|
|
||||||
FileImpl::handleLastErrorImpl(path);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
throw SystemException("failed to wait for directory changes");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
catch (Poco::Exception& exc)
|
|
||||||
{
|
|
||||||
owner().scanError(&owner(), exc);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
FindCloseChangeNotification(hChange);
|
|
||||||
}
|
|
||||||
|
|
||||||
void stop()
|
|
||||||
{
|
|
||||||
SetEvent(_hStopped);
|
|
||||||
}
|
|
||||||
|
|
||||||
bool supportsMoveEvents() const
|
|
||||||
{
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
private:
|
|
||||||
HANDLE _hStopped;
|
|
||||||
};
|
|
||||||
|
|
||||||
|
|
||||||
#elif POCO_OS == POCO_OS_LINUX || POCO_OS == POCO_OS_ANDROID
|
|
||||||
|
|
||||||
|
|
||||||
class LinuxDirectoryWatcherStrategy: public DirectoryWatcherStrategy
|
|
||||||
{
|
|
||||||
public:
|
|
||||||
LinuxDirectoryWatcherStrategy(DirectoryWatcher& owner):
|
|
||||||
DirectoryWatcherStrategy(owner),
|
|
||||||
_fd(-1),
|
|
||||||
_stopped(false)
|
|
||||||
{
|
|
||||||
_fd = inotify_init();
|
|
||||||
if (_fd == -1) throw Poco::IOException("cannot initialize inotify", errno);
|
|
||||||
}
|
|
||||||
|
|
||||||
~LinuxDirectoryWatcherStrategy()
|
|
||||||
{
|
|
||||||
close(_fd);
|
|
||||||
}
|
|
||||||
|
|
||||||
void run()
|
|
||||||
{
|
|
||||||
int mask = 0;
|
|
||||||
if (owner().eventMask() & DirectoryWatcher::DW_ITEM_ADDED)
|
|
||||||
mask |= IN_CREATE;
|
|
||||||
if (owner().eventMask() & DirectoryWatcher::DW_ITEM_REMOVED)
|
|
||||||
mask |= IN_DELETE;
|
|
||||||
if (owner().eventMask() & DirectoryWatcher::DW_ITEM_MODIFIED)
|
|
||||||
mask |= IN_MODIFY;
|
|
||||||
if (owner().eventMask() & DirectoryWatcher::DW_ITEM_MOVED_FROM)
|
|
||||||
mask |= IN_MOVED_FROM;
|
|
||||||
if (owner().eventMask() & DirectoryWatcher::DW_ITEM_MOVED_TO)
|
|
||||||
mask |= IN_MOVED_TO;
|
|
||||||
int wd = inotify_add_watch(_fd, owner().directory().path().c_str(), mask);
|
|
||||||
if (wd == -1)
|
|
||||||
{
|
|
||||||
try
|
|
||||||
{
|
|
||||||
FileImpl::handleLastErrorImpl(owner().directory().path());
|
|
||||||
}
|
|
||||||
catch (Poco::Exception& exc)
|
|
||||||
{
|
|
||||||
owner().scanError(&owner(), exc);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Poco::Buffer<char> buffer(4096);
|
|
||||||
while (!_stopped.load(std::memory_order_relaxed))
|
|
||||||
{
|
|
||||||
fd_set fds;
|
|
||||||
FD_ZERO(&fds);
|
|
||||||
FD_SET(_fd, &fds);
|
|
||||||
|
|
||||||
struct timeval tv;
|
|
||||||
tv.tv_sec = 0;
|
|
||||||
tv.tv_usec = 200000;
|
|
||||||
|
|
||||||
if (select(_fd + 1, &fds, NULL, NULL, &tv) == 1)
|
|
||||||
{
|
|
||||||
int n = read(_fd, buffer.begin(), buffer.size());
|
|
||||||
int i = 0;
|
|
||||||
if (n > 0)
|
|
||||||
{
|
|
||||||
while (n > 0)
|
|
||||||
{
|
|
||||||
struct inotify_event* event = reinterpret_cast<struct inotify_event*>(buffer.begin() + i);
|
|
||||||
|
|
||||||
if (event->len > 0)
|
|
||||||
{
|
|
||||||
if (!owner().eventsSuspended())
|
|
||||||
{
|
|
||||||
Poco::Path p(owner().directory().path());
|
|
||||||
p.makeDirectory();
|
|
||||||
p.setFileName(event->name);
|
|
||||||
Poco::File f(p.toString());
|
|
||||||
|
|
||||||
if ((event->mask & IN_CREATE) && (owner().eventMask() & DirectoryWatcher::DW_ITEM_ADDED))
|
|
||||||
{
|
|
||||||
DirectoryWatcher::DirectoryEvent ev(f, DirectoryWatcher::DW_ITEM_ADDED);
|
|
||||||
owner().itemAdded(&owner(), ev);
|
|
||||||
}
|
|
||||||
if ((event->mask & IN_DELETE) && (owner().eventMask() & DirectoryWatcher::DW_ITEM_REMOVED))
|
|
||||||
{
|
|
||||||
DirectoryWatcher::DirectoryEvent ev(f, DirectoryWatcher::DW_ITEM_REMOVED);
|
|
||||||
owner().itemRemoved(&owner(), ev);
|
|
||||||
}
|
|
||||||
if ((event->mask & IN_MODIFY) && (owner().eventMask() & DirectoryWatcher::DW_ITEM_MODIFIED))
|
|
||||||
{
|
|
||||||
DirectoryWatcher::DirectoryEvent ev(f, DirectoryWatcher::DW_ITEM_MODIFIED);
|
|
||||||
owner().itemModified(&owner(), ev);
|
|
||||||
}
|
|
||||||
if ((event->mask & IN_MOVED_FROM) && (owner().eventMask() & DirectoryWatcher::DW_ITEM_MOVED_FROM))
|
|
||||||
{
|
|
||||||
DirectoryWatcher::DirectoryEvent ev(f, DirectoryWatcher::DW_ITEM_MOVED_FROM);
|
|
||||||
owner().itemMovedFrom(&owner(), ev);
|
|
||||||
}
|
|
||||||
if ((event->mask & IN_MOVED_TO) && (owner().eventMask() & DirectoryWatcher::DW_ITEM_MOVED_TO))
|
|
||||||
{
|
|
||||||
DirectoryWatcher::DirectoryEvent ev(f, DirectoryWatcher::DW_ITEM_MOVED_TO);
|
|
||||||
owner().itemMovedTo(&owner(), ev);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
i += sizeof(inotify_event) + event->len;
|
|
||||||
n -= sizeof(inotify_event) + event->len;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void stop()
|
|
||||||
{
|
|
||||||
_stopped.store(true, std::memory_order_relaxed);
|
|
||||||
}
|
|
||||||
|
|
||||||
bool supportsMoveEvents() const
|
|
||||||
{
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
private:
|
|
||||||
int _fd;
|
|
||||||
std::atomic<bool> _stopped;
|
|
||||||
};
|
|
||||||
|
|
||||||
|
|
||||||
#elif POCO_OS == POCO_OS_MAC_OS_X || POCO_OS == POCO_OS_FREE_BSD
|
|
||||||
|
|
||||||
|
|
||||||
class BSDDirectoryWatcherStrategy: public DirectoryWatcherStrategy
|
|
||||||
{
|
|
||||||
public:
|
|
||||||
BSDDirectoryWatcherStrategy(DirectoryWatcher& owner):
|
|
||||||
DirectoryWatcherStrategy(owner),
|
|
||||||
_queueFD(-1),
|
|
||||||
_dirFD(-1),
|
|
||||||
_stopped(false)
|
|
||||||
{
|
|
||||||
_dirFD = open(owner.directory().path().c_str(), O_EVTONLY);
|
|
||||||
if (_dirFD < 0) throw Poco::FileNotFoundException(owner.directory().path());
|
|
||||||
_queueFD = kqueue();
|
|
||||||
if (_queueFD < 0)
|
|
||||||
{
|
|
||||||
close(_dirFD);
|
|
||||||
throw Poco::SystemException("Cannot create kqueue", errno);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
~BSDDirectoryWatcherStrategy()
|
|
||||||
{
|
|
||||||
close(_dirFD);
|
|
||||||
close(_queueFD);
|
|
||||||
}
|
|
||||||
|
|
||||||
void run()
|
|
||||||
{
|
|
||||||
Poco::Timestamp lastScan;
|
|
||||||
ItemInfoMap entries;
|
|
||||||
scan(entries);
|
|
||||||
|
|
||||||
while (!_stopped.load(std::memory_order_relaxed))
|
|
||||||
{
|
|
||||||
struct timespec timeout;
|
|
||||||
timeout.tv_sec = 0;
|
|
||||||
timeout.tv_nsec = 200000000;
|
|
||||||
unsigned eventFilter = NOTE_WRITE;
|
|
||||||
struct kevent event;
|
|
||||||
struct kevent eventData;
|
|
||||||
EV_SET(&event, _dirFD, EVFILT_VNODE, EV_ADD | EV_CLEAR, eventFilter, 0, 0);
|
|
||||||
int nEvents = kevent(_queueFD, &event, 1, &eventData, 1, &timeout);
|
|
||||||
if (nEvents < 0 || eventData.flags == EV_ERROR)
|
|
||||||
{
|
|
||||||
try
|
|
||||||
{
|
|
||||||
FileImpl::handleLastErrorImpl(owner().directory().path());
|
|
||||||
}
|
|
||||||
catch (Poco::Exception& exc)
|
|
||||||
{
|
|
||||||
owner().scanError(&owner(), exc);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else if (nEvents > 0 || ((owner().eventMask() & DirectoryWatcher::DW_ITEM_MODIFIED) && lastScan.isElapsed(owner().scanInterval()*1000000)))
|
|
||||||
{
|
|
||||||
ItemInfoMap newEntries;
|
|
||||||
scan(newEntries);
|
|
||||||
compare(entries, newEntries);
|
|
||||||
std::swap(entries, newEntries);
|
|
||||||
lastScan.update();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void stop()
|
|
||||||
{
|
|
||||||
_stopped.store(true, std::memory_order_relaxed);
|
|
||||||
}
|
|
||||||
|
|
||||||
bool supportsMoveEvents() const
|
|
||||||
{
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
private:
|
|
||||||
int _queueFD;
|
|
||||||
int _dirFD;
|
|
||||||
std::atomic<bool> _stopped;
|
|
||||||
};
|
|
||||||
|
|
||||||
|
|
||||||
#else
|
|
||||||
|
|
||||||
|
|
||||||
class PollingDirectoryWatcherStrategy: public DirectoryWatcherStrategy
|
|
||||||
{
|
|
||||||
public:
|
|
||||||
PollingDirectoryWatcherStrategy(DirectoryWatcher& owner):
|
|
||||||
DirectoryWatcherStrategy(owner)
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
~PollingDirectoryWatcherStrategy()
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
void run()
|
|
||||||
{
|
|
||||||
ItemInfoMap entries;
|
|
||||||
scan(entries);
|
|
||||||
while (!_stopped.tryWait(1000*owner().scanInterval()))
|
|
||||||
{
|
|
||||||
try
|
|
||||||
{
|
|
||||||
ItemInfoMap newEntries;
|
|
||||||
scan(newEntries);
|
|
||||||
compare(entries, newEntries);
|
|
||||||
std::swap(entries, newEntries);
|
|
||||||
}
|
|
||||||
catch (Poco::Exception& exc)
|
|
||||||
{
|
|
||||||
owner().scanError(&owner(), exc);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void stop()
|
|
||||||
{
|
|
||||||
_stopped.set();
|
|
||||||
}
|
|
||||||
|
|
||||||
bool supportsMoveEvents() const
|
|
||||||
{
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
private:
|
|
||||||
Poco::Event _stopped;
|
|
||||||
};
|
|
||||||
|
|
||||||
|
|
||||||
#endif
|
|
||||||
|
|
||||||
|
|
||||||
DirectoryWatcher::DirectoryWatcher(const std::string& path, int eventMask, int scanInterval):
|
|
||||||
_directory(path),
|
|
||||||
_eventMask(eventMask),
|
|
||||||
_scanInterval(scanInterval)
|
|
||||||
{
|
|
||||||
init();
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
DirectoryWatcher::DirectoryWatcher(const Poco::File& directory, int eventMask, int scanInterval):
|
|
||||||
_directory(directory),
|
|
||||||
_eventMask(eventMask),
|
|
||||||
_scanInterval(scanInterval)
|
|
||||||
{
|
|
||||||
init();
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
DirectoryWatcher::~DirectoryWatcher()
|
|
||||||
{
|
|
||||||
try
|
|
||||||
{
|
|
||||||
stop();
|
|
||||||
delete _pStrategy;
|
|
||||||
}
|
|
||||||
catch (...)
|
|
||||||
{
|
|
||||||
poco_unexpected();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
void DirectoryWatcher::suspendEvents()
|
|
||||||
{
|
|
||||||
poco_assert (_eventsSuspended > 0);
|
|
||||||
|
|
||||||
_eventsSuspended--;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
void DirectoryWatcher::resumeEvents()
|
|
||||||
{
|
|
||||||
_eventsSuspended++;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
void DirectoryWatcher::init()
|
|
||||||
{
|
|
||||||
if (!_directory.exists())
|
|
||||||
throw Poco::FileNotFoundException(_directory.path());
|
|
||||||
|
|
||||||
if (!_directory.isDirectory())
|
|
||||||
throw Poco::InvalidArgumentException("not a directory", _directory.path());
|
|
||||||
|
|
||||||
#if POCO_OS == POCO_OS_WINDOWS_NT
|
|
||||||
_pStrategy = new WindowsDirectoryWatcherStrategy(*this);
|
|
||||||
#elif POCO_OS == POCO_OS_LINUX || POCO_OS == POCO_OS_ANDROID
|
|
||||||
_pStrategy = new LinuxDirectoryWatcherStrategy(*this);
|
|
||||||
#elif POCO_OS == POCO_OS_MAC_OS_X || POCO_OS == POCO_OS_FREE_BSD
|
|
||||||
_pStrategy = new BSDDirectoryWatcherStrategy(*this);
|
|
||||||
#else
|
|
||||||
_pStrategy = new PollingDirectoryWatcherStrategy(*this);
|
|
||||||
#endif
|
|
||||||
_thread.start(*this);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
void DirectoryWatcher::run()
|
|
||||||
{
|
|
||||||
_pStrategy->run();
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
void DirectoryWatcher::stop()
|
|
||||||
{
|
|
||||||
_pStrategy->stop();
|
|
||||||
_thread.join();
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
bool DirectoryWatcher::supportsMoveEvents() const
|
|
||||||
{
|
|
||||||
return _pStrategy->supportsMoveEvents();
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
} // namespace Poco
|
|
||||||
|
|
||||||
|
|
||||||
#endif // POCO_NO_INOTIFY
|
|
@ -42,10 +42,8 @@ if (CMAKE_CROSSCOMPILING)
|
|||||||
if (ARCH_AARCH64)
|
if (ARCH_AARCH64)
|
||||||
# FIXME: broken dependencies
|
# FIXME: broken dependencies
|
||||||
set (ENABLE_GRPC OFF CACHE INTERNAL "")
|
set (ENABLE_GRPC OFF CACHE INTERNAL "")
|
||||||
set (ENABLE_SENTRY OFF CACHE INTERNAL "")
|
|
||||||
elseif (ARCH_PPC64LE)
|
elseif (ARCH_PPC64LE)
|
||||||
set (ENABLE_GRPC OFF CACHE INTERNAL "")
|
set (ENABLE_GRPC OFF CACHE INTERNAL "")
|
||||||
set (ENABLE_SENTRY OFF CACHE INTERNAL "")
|
|
||||||
elseif (ARCH_RISCV64)
|
elseif (ARCH_RISCV64)
|
||||||
# RISC-V support is preliminary
|
# RISC-V support is preliminary
|
||||||
set (GLIBC_COMPATIBILITY OFF CACHE INTERNAL "")
|
set (GLIBC_COMPATIBILITY OFF CACHE INTERNAL "")
|
||||||
@ -73,19 +71,10 @@ if (CMAKE_CROSSCOMPILING)
|
|||||||
message (FATAL_ERROR "Trying to cross-compile to unsupported system: ${CMAKE_SYSTEM_NAME}!")
|
message (FATAL_ERROR "Trying to cross-compile to unsupported system: ${CMAKE_SYSTEM_NAME}!")
|
||||||
endif ()
|
endif ()
|
||||||
|
|
||||||
if (USE_MUSL)
|
|
||||||
# use of undeclared identifier 'PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP'
|
|
||||||
set (ENABLE_SENTRY OFF CACHE INTERNAL "")
|
|
||||||
set (ENABLE_ODBC OFF CACHE INTERNAL "")
|
|
||||||
set (ENABLE_GRPC OFF CACHE INTERNAL "")
|
|
||||||
set (ENABLE_HDFS OFF CACHE INTERNAL "")
|
|
||||||
set (ENABLE_EMBEDDED_COMPILER OFF CACHE INTERNAL "")
|
|
||||||
# use of drand48_data
|
|
||||||
set (ENABLE_AZURE_BLOB_STORAGE OFF CACHE INTERNAL "")
|
|
||||||
endif ()
|
|
||||||
|
|
||||||
# Don't know why but CXX_STANDARD doesn't work for cross-compilation
|
|
||||||
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++20")
|
|
||||||
|
|
||||||
message (STATUS "Cross-compiling for target: ${CMAKE_CXX_COMPILE_TARGET}")
|
message (STATUS "Cross-compiling for target: ${CMAKE_CXX_COMPILE_TARGET}")
|
||||||
endif ()
|
endif ()
|
||||||
|
|
||||||
|
if (USE_MUSL)
|
||||||
|
# Does not work for unknown reason
|
||||||
|
set (ENABLE_RUST OFF CACHE INTERNAL "")
|
||||||
|
endif ()
|
||||||
|
2
contrib/azure
vendored
2
contrib/azure
vendored
@ -1 +1 @@
|
|||||||
Subproject commit d94ae337c8ffbf74e99d412ac55e38f2190490f5
|
Subproject commit 352ff0a61cb319ac1cc38c4058443ddf70147530
|
2
contrib/libhdfs3
vendored
2
contrib/libhdfs3
vendored
@ -1 +1 @@
|
|||||||
Subproject commit bdcb91354b1c05b21e73043a112a6f1e3b013497
|
Subproject commit b9598e6016720a7c088bfe85ce1fa0410f9d2103
|
@ -26,6 +26,11 @@ ADD_DEFINITIONS(-D__STDC_FORMAT_MACROS)
|
|||||||
ADD_DEFINITIONS(-D_GNU_SOURCE)
|
ADD_DEFINITIONS(-D_GNU_SOURCE)
|
||||||
ADD_DEFINITIONS(-D_GLIBCXX_USE_NANOSLEEP)
|
ADD_DEFINITIONS(-D_GLIBCXX_USE_NANOSLEEP)
|
||||||
ADD_DEFINITIONS(-DHAVE_NANOSLEEP)
|
ADD_DEFINITIONS(-DHAVE_NANOSLEEP)
|
||||||
|
|
||||||
|
if (USE_MUSL)
|
||||||
|
ADD_DEFINITIONS(-DSTRERROR_R_RETURN_INT)
|
||||||
|
endif ()
|
||||||
|
|
||||||
set(HAVE_STEADY_CLOCK 1)
|
set(HAVE_STEADY_CLOCK 1)
|
||||||
set(HAVE_NESTED_EXCEPTION 1)
|
set(HAVE_NESTED_EXCEPTION 1)
|
||||||
SET(HAVE_BOOST_CHRONO 0)
|
SET(HAVE_BOOST_CHRONO 0)
|
||||||
|
2
contrib/llvm-project
vendored
2
contrib/llvm-project
vendored
@ -1 +1 @@
|
|||||||
Subproject commit e7b8befca85c8b847614432dba250c22d35fbae0
|
Subproject commit 1834e42289c58402c804a87be4d489892b88f3ec
|
@ -117,7 +117,7 @@ endif()
|
|||||||
|
|
||||||
add_definitions(-DROCKSDB_PLATFORM_POSIX -DROCKSDB_LIB_IO_POSIX)
|
add_definitions(-DROCKSDB_PLATFORM_POSIX -DROCKSDB_LIB_IO_POSIX)
|
||||||
|
|
||||||
if (OS_LINUX OR OS_FREEBSD)
|
if ((OS_LINUX OR OS_FREEBSD) AND NOT USE_MUSL)
|
||||||
add_definitions(-DROCKSDB_PTHREAD_ADAPTIVE_MUTEX)
|
add_definitions(-DROCKSDB_PTHREAD_ADAPTIVE_MUTEX)
|
||||||
endif()
|
endif()
|
||||||
|
|
||||||
|
2
contrib/sentry-native
vendored
2
contrib/sentry-native
vendored
@ -1 +1 @@
|
|||||||
Subproject commit ae10fb8c224c3f41571446e1ed7fd57b9e5e366b
|
Subproject commit bc359f86cbf0f73f6fd4b6bfb4ede0c1f8c9400f
|
@ -13,6 +13,7 @@ set (SRC_DIR "${ClickHouse_SOURCE_DIR}/contrib/sentry-native")
|
|||||||
|
|
||||||
set (SRCS
|
set (SRCS
|
||||||
${SRC_DIR}/vendor/mpack.c
|
${SRC_DIR}/vendor/mpack.c
|
||||||
|
${SRC_DIR}/vendor/stb_sprintf.c
|
||||||
${SRC_DIR}/src/sentry_alloc.c
|
${SRC_DIR}/src/sentry_alloc.c
|
||||||
${SRC_DIR}/src/sentry_backend.c
|
${SRC_DIR}/src/sentry_backend.c
|
||||||
${SRC_DIR}/src/sentry_core.c
|
${SRC_DIR}/src/sentry_core.c
|
||||||
@ -21,6 +22,7 @@ set (SRCS
|
|||||||
${SRC_DIR}/src/sentry_json.c
|
${SRC_DIR}/src/sentry_json.c
|
||||||
${SRC_DIR}/src/sentry_logger.c
|
${SRC_DIR}/src/sentry_logger.c
|
||||||
${SRC_DIR}/src/sentry_options.c
|
${SRC_DIR}/src/sentry_options.c
|
||||||
|
${SRC_DIR}/src/sentry_os.c
|
||||||
${SRC_DIR}/src/sentry_random.c
|
${SRC_DIR}/src/sentry_random.c
|
||||||
${SRC_DIR}/src/sentry_ratelimiter.c
|
${SRC_DIR}/src/sentry_ratelimiter.c
|
||||||
${SRC_DIR}/src/sentry_scope.c
|
${SRC_DIR}/src/sentry_scope.c
|
||||||
@ -29,6 +31,7 @@ set (SRCS
|
|||||||
${SRC_DIR}/src/sentry_string.c
|
${SRC_DIR}/src/sentry_string.c
|
||||||
${SRC_DIR}/src/sentry_sync.c
|
${SRC_DIR}/src/sentry_sync.c
|
||||||
${SRC_DIR}/src/sentry_transport.c
|
${SRC_DIR}/src/sentry_transport.c
|
||||||
|
${SRC_DIR}/src/sentry_tracing.c
|
||||||
${SRC_DIR}/src/sentry_utils.c
|
${SRC_DIR}/src/sentry_utils.c
|
||||||
${SRC_DIR}/src/sentry_uuid.c
|
${SRC_DIR}/src/sentry_uuid.c
|
||||||
${SRC_DIR}/src/sentry_value.c
|
${SRC_DIR}/src/sentry_value.c
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
option (ENABLE_ODBC "Enable ODBC library" ${ENABLE_LIBRARIES})
|
option (ENABLE_ODBC "Enable ODBC library" ${ENABLE_LIBRARIES})
|
||||||
if (NOT OS_LINUX)
|
if (NOT OS_LINUX OR USE_MUSL)
|
||||||
if (ENABLE_ODBC)
|
if (ENABLE_ODBC)
|
||||||
message(STATUS "ODBC is only supported on Linux")
|
message(STATUS "ODBC is only supported on Linux with dynamic linking")
|
||||||
endif()
|
endif()
|
||||||
set (ENABLE_ODBC OFF CACHE INTERNAL "")
|
set (ENABLE_ODBC OFF CACHE INTERNAL "")
|
||||||
endif ()
|
endif ()
|
||||||
|
@ -145,6 +145,7 @@ def parse_env_variables(
|
|||||||
RISCV_SUFFIX = "-riscv64"
|
RISCV_SUFFIX = "-riscv64"
|
||||||
S390X_SUFFIX = "-s390x"
|
S390X_SUFFIX = "-s390x"
|
||||||
AMD64_COMPAT_SUFFIX = "-amd64-compat"
|
AMD64_COMPAT_SUFFIX = "-amd64-compat"
|
||||||
|
AMD64_MUSL_SUFFIX = "-amd64-musl"
|
||||||
|
|
||||||
result = []
|
result = []
|
||||||
result.append("OUTPUT_DIR=/output")
|
result.append("OUTPUT_DIR=/output")
|
||||||
@ -163,6 +164,7 @@ def parse_env_variables(
|
|||||||
is_cross_s390x = compiler.endswith(S390X_SUFFIX)
|
is_cross_s390x = compiler.endswith(S390X_SUFFIX)
|
||||||
is_cross_freebsd = compiler.endswith(FREEBSD_SUFFIX)
|
is_cross_freebsd = compiler.endswith(FREEBSD_SUFFIX)
|
||||||
is_amd64_compat = compiler.endswith(AMD64_COMPAT_SUFFIX)
|
is_amd64_compat = compiler.endswith(AMD64_COMPAT_SUFFIX)
|
||||||
|
is_amd64_musl = compiler.endswith(AMD64_MUSL_SUFFIX)
|
||||||
|
|
||||||
if is_cross_darwin:
|
if is_cross_darwin:
|
||||||
cc = compiler[: -len(DARWIN_SUFFIX)]
|
cc = compiler[: -len(DARWIN_SUFFIX)]
|
||||||
@ -232,6 +234,12 @@ def parse_env_variables(
|
|||||||
cc = compiler[: -len(AMD64_COMPAT_SUFFIX)]
|
cc = compiler[: -len(AMD64_COMPAT_SUFFIX)]
|
||||||
result.append("DEB_ARCH=amd64")
|
result.append("DEB_ARCH=amd64")
|
||||||
cmake_flags.append("-DNO_SSE3_OR_HIGHER=1")
|
cmake_flags.append("-DNO_SSE3_OR_HIGHER=1")
|
||||||
|
elif is_amd64_musl:
|
||||||
|
cc = compiler[: -len(AMD64_MUSL_SUFFIX)]
|
||||||
|
result.append("DEB_ARCH=amd64")
|
||||||
|
cmake_flags.append(
|
||||||
|
"-DCMAKE_TOOLCHAIN_FILE=/build/cmake/linux/toolchain-x86_64-musl.cmake"
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
cc = compiler
|
cc = compiler
|
||||||
result.append("DEB_ARCH=amd64")
|
result.append("DEB_ARCH=amd64")
|
||||||
@ -396,6 +404,7 @@ def parse_args() -> argparse.Namespace:
|
|||||||
"clang-17-riscv64",
|
"clang-17-riscv64",
|
||||||
"clang-17-s390x",
|
"clang-17-s390x",
|
||||||
"clang-17-amd64-compat",
|
"clang-17-amd64-compat",
|
||||||
|
"clang-17-amd64-musl",
|
||||||
"clang-17-freebsd",
|
"clang-17-freebsd",
|
||||||
),
|
),
|
||||||
default="clang-17",
|
default="clang-17",
|
||||||
|
@ -16,7 +16,7 @@ export LLVM_VERSION=${LLVM_VERSION:-17}
|
|||||||
# it being undefined. Also read it as array so that we can pass an empty list
|
# it being undefined. Also read it as array so that we can pass an empty list
|
||||||
# of additional variable to cmake properly, and it doesn't generate an extra
|
# of additional variable to cmake properly, and it doesn't generate an extra
|
||||||
# empty parameter.
|
# empty parameter.
|
||||||
# Read it as CMAKE_FLAGS to not lose exported FASTTEST_CMAKE_FLAGS on subsequential launch
|
# Read it as CMAKE_FLAGS to not lose exported FASTTEST_CMAKE_FLAGS on subsequent launch
|
||||||
read -ra CMAKE_FLAGS <<< "${FASTTEST_CMAKE_FLAGS:-}"
|
read -ra CMAKE_FLAGS <<< "${FASTTEST_CMAKE_FLAGS:-}"
|
||||||
|
|
||||||
# Run only matching tests.
|
# Run only matching tests.
|
||||||
@ -197,7 +197,7 @@ function run_cmake
|
|||||||
|
|
||||||
(
|
(
|
||||||
cd "$FASTTEST_BUILD"
|
cd "$FASTTEST_BUILD"
|
||||||
cmake "$FASTTEST_SOURCE" -DCMAKE_CXX_COMPILER="clang++-${LLVM_VERSION}" -DCMAKE_C_COMPILER="clang-${LLVM_VERSION}" "${CMAKE_LIBS_CONFIG[@]}" "${CMAKE_FLAGS[@]}" 2>&1 | ts '%Y-%m-%d %H:%M:%S' | tee "$FASTTEST_OUTPUT/cmake_log.txt"
|
cmake "$FASTTEST_SOURCE" -DCMAKE_CXX_COMPILER="clang++-${LLVM_VERSION}" -DCMAKE_C_COMPILER="clang-${LLVM_VERSION}" -DCMAKE_TOOLCHAIN_FILE="${FASTTEST_SOURCE}/cmake/linux/toolchain-x86_64-musl.cmake" "${CMAKE_LIBS_CONFIG[@]}" "${CMAKE_FLAGS[@]}" 2>&1 | ts '%Y-%m-%d %H:%M:%S' | tee "$FASTTEST_OUTPUT/cmake_log.txt"
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -28,18 +28,20 @@ sudo apt-get install clang-17
|
|||||||
Let’s remember the path where we install `cctools` as ${CCTOOLS}
|
Let’s remember the path where we install `cctools` as ${CCTOOLS}
|
||||||
|
|
||||||
``` bash
|
``` bash
|
||||||
|
mkdir ~/cctools
|
||||||
export CCTOOLS=$(cd ~/cctools && pwd)
|
export CCTOOLS=$(cd ~/cctools && pwd)
|
||||||
mkdir ${CCTOOLS}
|
|
||||||
cd ${CCTOOLS}
|
cd ${CCTOOLS}
|
||||||
|
|
||||||
git clone --depth=1 https://github.com/tpoechtrager/apple-libtapi.git
|
git clone https://github.com/tpoechtrager/apple-libtapi.git
|
||||||
cd apple-libtapi
|
cd apple-libtapi
|
||||||
|
git checkout 15dfc2a8c9a2a89d06ff227560a69f5265b692f9
|
||||||
INSTALLPREFIX=${CCTOOLS} ./build.sh
|
INSTALLPREFIX=${CCTOOLS} ./build.sh
|
||||||
./install.sh
|
./install.sh
|
||||||
cd ..
|
cd ..
|
||||||
|
|
||||||
git clone --depth=1 https://github.com/tpoechtrager/cctools-port.git
|
git clone https://github.com/tpoechtrager/cctools-port.git
|
||||||
cd cctools-port/cctools
|
cd cctools-port/cctools
|
||||||
|
git checkout 2a3e1c2a6ff54a30f898b70cfb9ba1692a55fad7
|
||||||
./configure --prefix=$(readlink -f ${CCTOOLS}) --with-libtapi=$(readlink -f ${CCTOOLS}) --target=x86_64-apple-darwin
|
./configure --prefix=$(readlink -f ${CCTOOLS}) --with-libtapi=$(readlink -f ${CCTOOLS}) --target=x86_64-apple-darwin
|
||||||
make install
|
make install
|
||||||
```
|
```
|
||||||
|
@ -90,152 +90,11 @@ Views look the same as normal tables. For example, they are listed in the result
|
|||||||
|
|
||||||
To delete a view, use [DROP VIEW](../../../sql-reference/statements/drop.md#drop-view). Although `DROP TABLE` works for VIEWs as well.
|
To delete a view, use [DROP VIEW](../../../sql-reference/statements/drop.md#drop-view). Although `DROP TABLE` works for VIEWs as well.
|
||||||
|
|
||||||
## Live View [Experimental]
|
## Live View [Deprecated]
|
||||||
|
|
||||||
:::note
|
This feature is deprecated and will be removed in the future.
|
||||||
This is an experimental feature that may change in backwards-incompatible ways in the future releases. Enable usage of live views and `WATCH` query using [allow_experimental_live_view](../../../operations/settings/settings.md#allow-experimental-live-view) setting. Input the command `set allow_experimental_live_view = 1`.
|
|
||||||
:::
|
|
||||||
|
|
||||||
```sql
|
For your convenience, the old documentation is located [here](https://pastila.nl/?00f32652/fdf07272a7b54bda7e13b919264e449f.md)
|
||||||
CREATE LIVE VIEW [IF NOT EXISTS] [db.]table_name [WITH REFRESH [value_in_sec]] AS SELECT ...
|
|
||||||
```
|
|
||||||
|
|
||||||
Live views store result of the corresponding [SELECT](../../../sql-reference/statements/select/index.md) query and are updated any time the result of the query changes. Query result as well as partial result needed to combine with new data are stored in memory providing increased performance for repeated queries. Live views can provide push notifications when query result changes using the [WATCH](../../../sql-reference/statements/watch.md) query.
|
|
||||||
|
|
||||||
Live views are triggered by insert into the innermost table specified in the query.
|
|
||||||
|
|
||||||
Live views work similarly to how a query in a distributed table works. But instead of combining partial results from different servers they combine partial result from current data with partial result from the new data. When a live view query includes a subquery then the cached partial result is only stored for the innermost subquery.
|
|
||||||
|
|
||||||
:::info
|
|
||||||
- [Table function](../../../sql-reference/table-functions/index.md) is not supported as the innermost table.
|
|
||||||
- Tables that do not have inserts such as a [dictionary](../../../sql-reference/dictionaries/index.md), [system table](../../../operations/system-tables/index.md), a [normal view](#normal), or a [materialized view](#materialized) will not trigger a live view.
|
|
||||||
- Only queries where one can combine partial result from the old data plus partial result from the new data will work. Live view will not work for queries that require the complete data set to compute the final result or aggregations where the state of the aggregation must be preserved.
|
|
||||||
- Does not work with replicated or distributed tables where inserts are performed on different nodes.
|
|
||||||
- Can't be triggered by multiple tables.
|
|
||||||
|
|
||||||
See [WITH REFRESH](#live-view-with-refresh) to force periodic updates of a live view that in some cases can be used as a workaround.
|
|
||||||
:::
|
|
||||||
|
|
||||||
### Monitoring Live View Changes
|
|
||||||
|
|
||||||
You can monitor changes in the `LIVE VIEW` query result using [WATCH](../../../sql-reference/statements/watch.md) query.
|
|
||||||
|
|
||||||
```sql
|
|
||||||
WATCH [db.]live_view
|
|
||||||
```
|
|
||||||
|
|
||||||
**Example:**
|
|
||||||
|
|
||||||
```sql
|
|
||||||
CREATE TABLE mt (x Int8) Engine = MergeTree ORDER BY x;
|
|
||||||
CREATE LIVE VIEW lv AS SELECT sum(x) FROM mt;
|
|
||||||
```
|
|
||||||
Watch a live view while doing a parallel insert into the source table.
|
|
||||||
|
|
||||||
```sql
|
|
||||||
WATCH lv;
|
|
||||||
```
|
|
||||||
|
|
||||||
```bash
|
|
||||||
┌─sum(x)─┬─_version─┐
|
|
||||||
│ 1 │ 1 │
|
|
||||||
└────────┴──────────┘
|
|
||||||
┌─sum(x)─┬─_version─┐
|
|
||||||
│ 3 │ 2 │
|
|
||||||
└────────┴──────────┘
|
|
||||||
┌─sum(x)─┬─_version─┐
|
|
||||||
│ 6 │ 3 │
|
|
||||||
└────────┴──────────┘
|
|
||||||
```
|
|
||||||
|
|
||||||
```sql
|
|
||||||
INSERT INTO mt VALUES (1);
|
|
||||||
INSERT INTO mt VALUES (2);
|
|
||||||
INSERT INTO mt VALUES (3);
|
|
||||||
```
|
|
||||||
|
|
||||||
Or add [EVENTS](../../../sql-reference/statements/watch.md#events-clause) clause to just get change events.
|
|
||||||
|
|
||||||
```sql
|
|
||||||
WATCH [db.]live_view EVENTS;
|
|
||||||
```
|
|
||||||
|
|
||||||
**Example:**
|
|
||||||
|
|
||||||
```sql
|
|
||||||
WATCH lv EVENTS;
|
|
||||||
```
|
|
||||||
|
|
||||||
```bash
|
|
||||||
┌─version─┐
|
|
||||||
│ 1 │
|
|
||||||
└─────────┘
|
|
||||||
┌─version─┐
|
|
||||||
│ 2 │
|
|
||||||
└─────────┘
|
|
||||||
┌─version─┐
|
|
||||||
│ 3 │
|
|
||||||
└─────────┘
|
|
||||||
```
|
|
||||||
|
|
||||||
You can execute [SELECT](../../../sql-reference/statements/select/index.md) query on a live view in the same way as for any regular view or a table. If the query result is cached it will return the result immediately without running the stored query on the underlying tables.
|
|
||||||
|
|
||||||
```sql
|
|
||||||
SELECT * FROM [db.]live_view WHERE ...
|
|
||||||
```
|
|
||||||
|
|
||||||
### Force Live View Refresh
|
|
||||||
|
|
||||||
You can force live view refresh using the `ALTER LIVE VIEW [db.]table_name REFRESH` statement.
|
|
||||||
|
|
||||||
### WITH REFRESH Clause
|
|
||||||
|
|
||||||
When a live view is created with a `WITH REFRESH` clause then it will be automatically refreshed after the specified number of seconds elapse since the last refresh or trigger.
|
|
||||||
|
|
||||||
```sql
|
|
||||||
CREATE LIVE VIEW [db.]table_name WITH REFRESH [value_in_sec] AS SELECT ...
|
|
||||||
```
|
|
||||||
|
|
||||||
If the refresh value is not specified then the value specified by the [periodic_live_view_refresh](../../../operations/settings/settings.md#periodic-live-view-refresh) setting is used.
|
|
||||||
|
|
||||||
**Example:**
|
|
||||||
|
|
||||||
```sql
|
|
||||||
CREATE LIVE VIEW lv WITH REFRESH 5 AS SELECT now();
|
|
||||||
WATCH lv
|
|
||||||
```
|
|
||||||
|
|
||||||
```bash
|
|
||||||
┌───────────────now()─┬─_version─┐
|
|
||||||
│ 2021-02-21 08:47:05 │ 1 │
|
|
||||||
└─────────────────────┴──────────┘
|
|
||||||
┌───────────────now()─┬─_version─┐
|
|
||||||
│ 2021-02-21 08:47:10 │ 2 │
|
|
||||||
└─────────────────────┴──────────┘
|
|
||||||
┌───────────────now()─┬─_version─┐
|
|
||||||
│ 2021-02-21 08:47:15 │ 3 │
|
|
||||||
└─────────────────────┴──────────┘
|
|
||||||
```
|
|
||||||
|
|
||||||
```sql
|
|
||||||
WATCH lv
|
|
||||||
```
|
|
||||||
|
|
||||||
```
|
|
||||||
Code: 60. DB::Exception: Received from localhost:9000. DB::Exception: Table default.lv does not exist..
|
|
||||||
```
|
|
||||||
|
|
||||||
### Live View Usage
|
|
||||||
|
|
||||||
Most common uses of live view tables include:
|
|
||||||
|
|
||||||
- Providing push notifications for query result changes to avoid polling.
|
|
||||||
- Caching results of most frequent queries to provide immediate query results.
|
|
||||||
- Watching for table changes and triggering a follow-up select queries.
|
|
||||||
- Watching metrics from system tables using periodic refresh.
|
|
||||||
|
|
||||||
**See Also**
|
|
||||||
- [ALTER LIVE VIEW](../alter/view.md#alter-live-view)
|
|
||||||
|
|
||||||
## Window View [Experimental]
|
## Window View [Experimental]
|
||||||
|
|
||||||
|
@ -7,7 +7,7 @@
|
|||||||
#include <cassert>
|
#include <cassert>
|
||||||
#include <cstring>
|
#include <cstring>
|
||||||
#include <unistd.h>
|
#include <unistd.h>
|
||||||
#include <sys/select.h>
|
#include <poll.h>
|
||||||
#include <sys/time.h>
|
#include <sys/time.h>
|
||||||
#include <sys/types.h>
|
#include <sys/types.h>
|
||||||
|
|
||||||
@ -27,11 +27,8 @@ void trim(String & s)
|
|||||||
/// Allows delaying the start of query execution until the entirety of query is inserted.
|
/// Allows delaying the start of query execution until the entirety of query is inserted.
|
||||||
bool hasInputData()
|
bool hasInputData()
|
||||||
{
|
{
|
||||||
timeval timeout = {0, 0};
|
pollfd fd{STDIN_FILENO, POLLIN, 0};
|
||||||
fd_set fds{};
|
return poll(&fd, 1, 0) == 1;
|
||||||
FD_ZERO(&fds);
|
|
||||||
FD_SET(STDIN_FILENO, &fds);
|
|
||||||
return select(1, &fds, nullptr, nullptr, &timeout) == 1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
struct NoCaseCompare
|
struct NoCaseCompare
|
||||||
|
@ -238,6 +238,7 @@
|
|||||||
M(DictCacheLockReadNs, "Number of nanoseconds spend in waiting for read lock to lookup the data for the dictionaries of 'cache' types.") \
|
M(DictCacheLockReadNs, "Number of nanoseconds spend in waiting for read lock to lookup the data for the dictionaries of 'cache' types.") \
|
||||||
\
|
\
|
||||||
M(DistributedSyncInsertionTimeoutExceeded, "A timeout has exceeded while waiting for shards during synchronous insertion into a Distributed table (with 'distributed_foreground_insert' = 1)") \
|
M(DistributedSyncInsertionTimeoutExceeded, "A timeout has exceeded while waiting for shards during synchronous insertion into a Distributed table (with 'distributed_foreground_insert' = 1)") \
|
||||||
|
M(DistributedAsyncInsertionFailures, "Number of failures for asynchronous insertion into a Distributed table (with 'distributed_foreground_insert' = 0)") \
|
||||||
M(DataAfterMergeDiffersFromReplica, R"(
|
M(DataAfterMergeDiffersFromReplica, R"(
|
||||||
Number of times data after merge is not byte-identical to the data on another replicas. There could be several reasons:
|
Number of times data after merge is not byte-identical to the data on another replicas. There could be several reasons:
|
||||||
1. Using newer version of compression library after server update.
|
1. Using newer version of compression library after server update.
|
||||||
|
@ -52,8 +52,7 @@ void Pool::Entry::decrementRefCount()
|
|||||||
Pool::Pool(const Poco::Util::AbstractConfiguration & cfg, const std::string & config_name,
|
Pool::Pool(const Poco::Util::AbstractConfiguration & cfg, const std::string & config_name,
|
||||||
unsigned default_connections_, unsigned max_connections_,
|
unsigned default_connections_, unsigned max_connections_,
|
||||||
const char * parent_config_name_)
|
const char * parent_config_name_)
|
||||||
: logger(Poco::Logger::get("mysqlxx::Pool"))
|
: default_connections(default_connections_)
|
||||||
, default_connections(default_connections_)
|
|
||||||
, max_connections(max_connections_)
|
, max_connections(max_connections_)
|
||||||
{
|
{
|
||||||
server = cfg.getString(config_name + ".host");
|
server = cfg.getString(config_name + ".host");
|
||||||
@ -127,6 +126,38 @@ Pool::Pool(const Poco::Util::AbstractConfiguration & cfg, const std::string & co
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
Pool::Pool(
|
||||||
|
const std::string & db_,
|
||||||
|
const std::string & server_,
|
||||||
|
const std::string & user_,
|
||||||
|
const std::string & password_,
|
||||||
|
unsigned port_,
|
||||||
|
const std::string & socket_,
|
||||||
|
unsigned connect_timeout_,
|
||||||
|
unsigned rw_timeout_,
|
||||||
|
unsigned default_connections_,
|
||||||
|
unsigned max_connections_,
|
||||||
|
unsigned enable_local_infile_,
|
||||||
|
bool opt_reconnect_)
|
||||||
|
: default_connections(default_connections_)
|
||||||
|
, max_connections(max_connections_)
|
||||||
|
, db(db_)
|
||||||
|
, server(server_)
|
||||||
|
, user(user_)
|
||||||
|
, password(password_)
|
||||||
|
, port(port_)
|
||||||
|
, socket(socket_)
|
||||||
|
, connect_timeout(connect_timeout_)
|
||||||
|
, rw_timeout(rw_timeout_)
|
||||||
|
, enable_local_infile(enable_local_infile_)
|
||||||
|
, opt_reconnect(opt_reconnect_)
|
||||||
|
{
|
||||||
|
LOG_DEBUG(log,
|
||||||
|
"Created MySQL Pool with settings: connect_timeout={}, read_write_timeout={}, default_connections_number={}, max_connections_number={}",
|
||||||
|
connect_timeout, rw_timeout, default_connections, max_connections);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
Pool::~Pool()
|
Pool::~Pool()
|
||||||
{
|
{
|
||||||
std::lock_guard lock(mutex);
|
std::lock_guard lock(mutex);
|
||||||
@ -148,29 +179,29 @@ Pool::Entry Pool::get(uint64_t wait_timeout)
|
|||||||
initialize();
|
initialize();
|
||||||
for (;;)
|
for (;;)
|
||||||
{
|
{
|
||||||
logger.trace("(%s): Iterating through existing MySQL connections", getDescription());
|
LOG_TRACE(log, "{}: Iterating through existing MySQL connections", getDescription());
|
||||||
|
|
||||||
for (auto & connection : connections)
|
for (auto & connection : connections)
|
||||||
{
|
{
|
||||||
if (connection->ref_count == 0)
|
if (connection->ref_count == 0)
|
||||||
{
|
{
|
||||||
logger.test("Found free connection in pool, returning it to the caller");
|
LOG_TEST(log, "Found free connection in pool, returning it to the caller");
|
||||||
return Entry(connection, this);
|
return Entry(connection, this);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.trace("(%s): Trying to allocate a new connection.", getDescription());
|
LOG_TRACE(log, "{}: Trying to allocate a new connection.", getDescription());
|
||||||
if (connections.size() < static_cast<size_t>(max_connections))
|
if (connections.size() < static_cast<size_t>(max_connections))
|
||||||
{
|
{
|
||||||
Connection * conn = allocConnection();
|
Connection * conn = allocConnection();
|
||||||
if (conn)
|
if (conn)
|
||||||
return Entry(conn, this);
|
return Entry(conn, this);
|
||||||
|
|
||||||
logger.trace("(%s): Unable to create a new connection: Allocation failed.", getDescription());
|
LOG_TRACE(log, "{}: Unable to create a new connection: Allocation failed.", getDescription());
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
logger.trace("(%s): Unable to create a new connection: Max number of connections has been reached.", getDescription());
|
LOG_TRACE(log, "{}: Unable to create a new connection: Max number of connections has been reached.", getDescription());
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!wait_timeout)
|
if (!wait_timeout)
|
||||||
@ -180,7 +211,7 @@ Pool::Entry Pool::get(uint64_t wait_timeout)
|
|||||||
throw Poco::Exception("mysqlxx::Pool is full (connection_wait_timeout is exceeded)");
|
throw Poco::Exception("mysqlxx::Pool is full (connection_wait_timeout is exceeded)");
|
||||||
|
|
||||||
lock.unlock();
|
lock.unlock();
|
||||||
logger.trace("(%s): Sleeping for %d seconds.", getDescription(), MYSQLXX_POOL_SLEEP_ON_CONNECT_FAIL);
|
LOG_TRACE(log, "{}: Sleeping for {} seconds.", getDescription(), MYSQLXX_POOL_SLEEP_ON_CONNECT_FAIL);
|
||||||
sleepForSeconds(MYSQLXX_POOL_SLEEP_ON_CONNECT_FAIL);
|
sleepForSeconds(MYSQLXX_POOL_SLEEP_ON_CONNECT_FAIL);
|
||||||
lock.lock();
|
lock.lock();
|
||||||
}
|
}
|
||||||
@ -206,7 +237,7 @@ Pool::Entry Pool::tryGet()
|
|||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.debug("(%s): Idle connection to MySQL server cannot be recovered, dropping it.", getDescription());
|
LOG_DEBUG(log, "{}: Idle connection to MySQL server cannot be recovered, dropping it.", getDescription());
|
||||||
|
|
||||||
/// This one is disconnected, cannot be reestablished and so needs to be disposed of.
|
/// This one is disconnected, cannot be reestablished and so needs to be disposed of.
|
||||||
connection_it = connections.erase(connection_it);
|
connection_it = connections.erase(connection_it);
|
||||||
@ -229,7 +260,7 @@ Pool::Entry Pool::tryGet()
|
|||||||
|
|
||||||
void Pool::removeConnection(Connection* connection)
|
void Pool::removeConnection(Connection* connection)
|
||||||
{
|
{
|
||||||
logger.trace("(%s): Removing connection.", getDescription());
|
LOG_TRACE(log, "{}: Removing connection.", getDescription());
|
||||||
|
|
||||||
std::lock_guard lock(mutex);
|
std::lock_guard lock(mutex);
|
||||||
if (connection)
|
if (connection)
|
||||||
@ -260,8 +291,8 @@ void Pool::Entry::forceConnected() const
|
|||||||
else
|
else
|
||||||
sleepForSeconds(MYSQLXX_POOL_SLEEP_ON_CONNECT_FAIL);
|
sleepForSeconds(MYSQLXX_POOL_SLEEP_ON_CONNECT_FAIL);
|
||||||
|
|
||||||
pool->logger.debug(
|
LOG_DEBUG(pool->log,
|
||||||
"Creating a new MySQL connection to %s with settings: connect_timeout=%u, read_write_timeout=%u",
|
"Creating a new MySQL connection to {} with settings: connect_timeout={}, read_write_timeout={}",
|
||||||
pool->description, pool->connect_timeout, pool->rw_timeout);
|
pool->description, pool->connect_timeout, pool->rw_timeout);
|
||||||
|
|
||||||
data->conn.connect(
|
data->conn.connect(
|
||||||
@ -287,21 +318,21 @@ bool Pool::Entry::tryForceConnected() const
|
|||||||
auto * const mysql_driver = data->conn.getDriver();
|
auto * const mysql_driver = data->conn.getDriver();
|
||||||
const auto prev_connection_id = mysql_thread_id(mysql_driver);
|
const auto prev_connection_id = mysql_thread_id(mysql_driver);
|
||||||
|
|
||||||
pool->logger.trace("Entry(connection %lu): sending PING to check if it is alive.", prev_connection_id);
|
LOG_TRACE(pool->log, "Entry(connection {}): sending PING to check if it is alive.", prev_connection_id);
|
||||||
if (data->conn.ping()) /// Attempts to reestablish lost connection
|
if (data->conn.ping()) /// Attempts to reestablish lost connection
|
||||||
{
|
{
|
||||||
const auto current_connection_id = mysql_thread_id(mysql_driver);
|
const auto current_connection_id = mysql_thread_id(mysql_driver);
|
||||||
if (prev_connection_id != current_connection_id)
|
if (prev_connection_id != current_connection_id)
|
||||||
{
|
{
|
||||||
pool->logger.debug("Entry(connection %lu): Reconnected to MySQL server. Connection id changed: %lu -> %lu",
|
LOG_DEBUG(pool->log, "Entry(connection {}): Reconnected to MySQL server. Connection id changed: {} -> {}",
|
||||||
current_connection_id, prev_connection_id, current_connection_id);
|
current_connection_id, prev_connection_id, current_connection_id);
|
||||||
}
|
}
|
||||||
|
|
||||||
pool->logger.trace("Entry(connection %lu): PING ok.", current_connection_id);
|
LOG_TRACE(pool->log, "Entry(connection {}): PING ok.", current_connection_id);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
pool->logger.trace("Entry(connection %lu): PING failed.", prev_connection_id);
|
LOG_TRACE(pool->log, "Entry(connection {}): PING failed.", prev_connection_id);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -326,10 +357,10 @@ Pool::Connection * Pool::allocConnection(bool dont_throw_if_failed_first_time)
|
|||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
logger.debug("Connecting to %s", description);
|
LOG_DEBUG(log, "Connecting to {}", description);
|
||||||
|
|
||||||
logger.debug(
|
LOG_DEBUG(log,
|
||||||
"Creating a new MySQL connection to %s with settings: connect_timeout=%u, read_write_timeout=%u",
|
"Creating a new MySQL connection to {} with settings: connect_timeout={}, read_write_timeout={}",
|
||||||
description, connect_timeout, rw_timeout);
|
description, connect_timeout, rw_timeout);
|
||||||
|
|
||||||
conn_ptr->conn.connect(
|
conn_ptr->conn.connect(
|
||||||
@ -349,7 +380,7 @@ Pool::Connection * Pool::allocConnection(bool dont_throw_if_failed_first_time)
|
|||||||
}
|
}
|
||||||
catch (mysqlxx::ConnectionFailed & e)
|
catch (mysqlxx::ConnectionFailed & e)
|
||||||
{
|
{
|
||||||
logger.error(e.what());
|
LOG_ERROR(log, "Failed to connect to MySQL ({}): {}", description, e.what());
|
||||||
|
|
||||||
if ((!was_successful && !dont_throw_if_failed_first_time)
|
if ((!was_successful && !dont_throw_if_failed_first_time)
|
||||||
|| e.errnum() == ER_ACCESS_DENIED_ERROR
|
|| e.errnum() == ER_ACCESS_DENIED_ERROR
|
||||||
|
@ -169,28 +169,10 @@ public:
|
|||||||
unsigned default_connections_ = MYSQLXX_POOL_DEFAULT_START_CONNECTIONS,
|
unsigned default_connections_ = MYSQLXX_POOL_DEFAULT_START_CONNECTIONS,
|
||||||
unsigned max_connections_ = MYSQLXX_POOL_DEFAULT_MAX_CONNECTIONS,
|
unsigned max_connections_ = MYSQLXX_POOL_DEFAULT_MAX_CONNECTIONS,
|
||||||
unsigned enable_local_infile_ = MYSQLXX_DEFAULT_ENABLE_LOCAL_INFILE,
|
unsigned enable_local_infile_ = MYSQLXX_DEFAULT_ENABLE_LOCAL_INFILE,
|
||||||
bool opt_reconnect_ = MYSQLXX_DEFAULT_MYSQL_OPT_RECONNECT)
|
bool opt_reconnect_ = MYSQLXX_DEFAULT_MYSQL_OPT_RECONNECT);
|
||||||
: logger(Poco::Logger::get("mysqlxx::Pool"))
|
|
||||||
, default_connections(default_connections_)
|
|
||||||
, max_connections(max_connections_)
|
|
||||||
, db(db_)
|
|
||||||
, server(server_)
|
|
||||||
, user(user_)
|
|
||||||
, password(password_)
|
|
||||||
, port(port_)
|
|
||||||
, socket(socket_)
|
|
||||||
, connect_timeout(connect_timeout_)
|
|
||||||
, rw_timeout(rw_timeout_)
|
|
||||||
, enable_local_infile(enable_local_infile_)
|
|
||||||
, opt_reconnect(opt_reconnect_)
|
|
||||||
{
|
|
||||||
logger.debug(
|
|
||||||
"Created MySQL Pool with settings: connect_timeout=%u, read_write_timeout=%u, default_connections_number=%u, max_connections_number=%u",
|
|
||||||
connect_timeout, rw_timeout, default_connections, max_connections);
|
|
||||||
}
|
|
||||||
|
|
||||||
Pool(const Pool & other)
|
Pool(const Pool & other)
|
||||||
: logger(other.logger), default_connections{other.default_connections},
|
: default_connections{other.default_connections},
|
||||||
max_connections{other.max_connections},
|
max_connections{other.max_connections},
|
||||||
db{other.db}, server{other.server},
|
db{other.db}, server{other.server},
|
||||||
user{other.user}, password{other.password},
|
user{other.user}, password{other.password},
|
||||||
@ -220,7 +202,7 @@ public:
|
|||||||
void removeConnection(Connection * connection);
|
void removeConnection(Connection * connection);
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
Poco::Logger & logger;
|
Poco::Logger * log = &Poco::Logger::get("mysqlxx::Pool");
|
||||||
|
|
||||||
/// Number of MySQL connections which are created at launch.
|
/// Number of MySQL connections which are created at launch.
|
||||||
unsigned default_connections;
|
unsigned default_connections;
|
||||||
|
@ -15,14 +15,13 @@ namespace ErrorCodes
|
|||||||
{
|
{
|
||||||
extern const int OPENSSL_ERROR;
|
extern const int OPENSSL_ERROR;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
namespace OpenSSLDetails
|
namespace OpenSSLDetails
|
||||||
{
|
{
|
||||||
void onError(std::string error_message)
|
void onError(std::string error_message)
|
||||||
{
|
{
|
||||||
error_message += ". OpenSSL error code: " + std::to_string(ERR_get_error());
|
throw Exception(ErrorCodes::OPENSSL_ERROR, "{}. OpenSSL error code: {}", error_message, ERR_get_error());
|
||||||
throw DB::Exception::createDeprecated(error_message, DB::ErrorCodes::OPENSSL_ERROR);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
StringRef foldEncryptionKeyInMySQLCompatitableMode(size_t cipher_key_size, StringRef key, std::array<char, EVP_MAX_KEY_LENGTH> & folded_key)
|
StringRef foldEncryptionKeyInMySQLCompatitableMode(size_t cipher_key_size, StringRef key, std::array<char, EVP_MAX_KEY_LENGTH> & folded_key)
|
||||||
@ -48,4 +47,6 @@ const EVP_CIPHER * getCipherByName(StringRef cipher_name)
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
@ -25,13 +25,14 @@
|
|||||||
|
|
||||||
#include <string.h>
|
#include <string.h>
|
||||||
|
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
namespace ErrorCodes
|
namespace ErrorCodes
|
||||||
{
|
{
|
||||||
extern const int BAD_ARGUMENTS;
|
extern const int BAD_ARGUMENTS;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
namespace OpenSSLDetails
|
namespace OpenSSLDetails
|
||||||
{
|
{
|
||||||
@ -60,7 +61,7 @@ struct KeyHolder
|
|||||||
inline StringRef setKey(size_t cipher_key_size, StringRef key) const
|
inline StringRef setKey(size_t cipher_key_size, StringRef key) const
|
||||||
{
|
{
|
||||||
if (key.size != cipher_key_size)
|
if (key.size != cipher_key_size)
|
||||||
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Invalid key size: {} expected {}", key.size, cipher_key_size);
|
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Invalid key size: {} expected {}", key.size, cipher_key_size);
|
||||||
|
|
||||||
return key;
|
return key;
|
||||||
}
|
}
|
||||||
@ -72,7 +73,7 @@ struct KeyHolder<CipherMode::MySQLCompatibility>
|
|||||||
inline StringRef setKey(size_t cipher_key_size, StringRef key)
|
inline StringRef setKey(size_t cipher_key_size, StringRef key)
|
||||||
{
|
{
|
||||||
if (key.size < cipher_key_size)
|
if (key.size < cipher_key_size)
|
||||||
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Invalid key size: {} expected {}", key.size, cipher_key_size);
|
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Invalid key size: {} expected {}", key.size, cipher_key_size);
|
||||||
|
|
||||||
// MySQL does something fancy with the keys that are too long,
|
// MySQL does something fancy with the keys that are too long,
|
||||||
// ruining compatibility with OpenSSL and not improving security.
|
// ruining compatibility with OpenSSL and not improving security.
|
||||||
@ -118,7 +119,7 @@ inline void validateCipherMode(const EVP_CIPHER * evp_cipher)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Unsupported cipher mode");
|
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unsupported cipher mode");
|
||||||
}
|
}
|
||||||
|
|
||||||
template <CipherMode mode>
|
template <CipherMode mode>
|
||||||
@ -127,13 +128,11 @@ inline void validateIV(StringRef iv_value, const size_t cipher_iv_size)
|
|||||||
// In MySQL mode we don't care if IV is longer than expected, only if shorter.
|
// In MySQL mode we don't care if IV is longer than expected, only if shorter.
|
||||||
if ((mode == CipherMode::MySQLCompatibility && iv_value.size != 0 && iv_value.size < cipher_iv_size)
|
if ((mode == CipherMode::MySQLCompatibility && iv_value.size != 0 && iv_value.size < cipher_iv_size)
|
||||||
|| (mode == CipherMode::OpenSSLCompatibility && iv_value.size != 0 && iv_value.size != cipher_iv_size))
|
|| (mode == CipherMode::OpenSSLCompatibility && iv_value.size != 0 && iv_value.size != cipher_iv_size))
|
||||||
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Invalid IV size: {} expected {}", iv_value.size, cipher_iv_size);
|
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Invalid IV size: {} expected {}", iv_value.size, cipher_iv_size);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
namespace DB
|
|
||||||
{
|
|
||||||
template <typename Impl>
|
template <typename Impl>
|
||||||
class FunctionEncrypt : public IFunction
|
class FunctionEncrypt : public IFunction
|
||||||
{
|
{
|
||||||
@ -313,12 +312,12 @@ private:
|
|||||||
// in GCM mode IV can be of arbitrary size (>0), IV is optional for other modes.
|
// in GCM mode IV can be of arbitrary size (>0), IV is optional for other modes.
|
||||||
if (mode == CipherMode::RFC5116_AEAD_AES_GCM && iv_value.size == 0)
|
if (mode == CipherMode::RFC5116_AEAD_AES_GCM && iv_value.size == 0)
|
||||||
{
|
{
|
||||||
throw Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Invalid IV size {} != expected size {}", iv_value.size, iv_size);
|
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Invalid IV size {} != expected size {}", iv_value.size, iv_size);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (mode != CipherMode::RFC5116_AEAD_AES_GCM && key_value.size != key_size)
|
if (mode != CipherMode::RFC5116_AEAD_AES_GCM && key_value.size != key_size)
|
||||||
{
|
{
|
||||||
throw Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Invalid key size {} != expected size {}", key_value.size, key_size);
|
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Invalid key size {} != expected size {}", key_value.size, key_size);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -608,12 +607,12 @@ private:
|
|||||||
// in GCM mode IV can be of arbitrary size (>0), for other modes IV is optional.
|
// in GCM mode IV can be of arbitrary size (>0), for other modes IV is optional.
|
||||||
if (mode == CipherMode::RFC5116_AEAD_AES_GCM && iv_value.size == 0)
|
if (mode == CipherMode::RFC5116_AEAD_AES_GCM && iv_value.size == 0)
|
||||||
{
|
{
|
||||||
throw Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Invalid IV size {} != expected size {}", iv_value.size, iv_size);
|
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Invalid IV size {} != expected size {}", iv_value.size, iv_size);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (key_value.size != key_size)
|
if (key_value.size != key_size)
|
||||||
{
|
{
|
||||||
throw Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Invalid key size {} != expected size {}", key_value.size, key_size);
|
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Invalid key size {} != expected size {}", key_value.size, key_size);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -5,6 +5,10 @@
|
|||||||
#include <Functions/FunctionFactory.h>
|
#include <Functions/FunctionFactory.h>
|
||||||
#include <Functions/FunctionsAES.h>
|
#include <Functions/FunctionsAES.h>
|
||||||
|
|
||||||
|
|
||||||
|
namespace DB
|
||||||
|
{
|
||||||
|
|
||||||
namespace
|
namespace
|
||||||
{
|
{
|
||||||
|
|
||||||
@ -17,9 +21,6 @@ struct DecryptMySQLModeImpl
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
namespace DB
|
|
||||||
{
|
|
||||||
|
|
||||||
REGISTER_FUNCTION(AESDecryptMysql)
|
REGISTER_FUNCTION(AESDecryptMysql)
|
||||||
{
|
{
|
||||||
factory.registerFunction<FunctionDecrypt<DecryptMySQLModeImpl>>();
|
factory.registerFunction<FunctionDecrypt<DecryptMySQLModeImpl>>();
|
||||||
|
@ -5,6 +5,9 @@
|
|||||||
#include <Functions/FunctionFactory.h>
|
#include <Functions/FunctionFactory.h>
|
||||||
#include <Functions/FunctionsAES.h>
|
#include <Functions/FunctionsAES.h>
|
||||||
|
|
||||||
|
namespace DB
|
||||||
|
{
|
||||||
|
|
||||||
namespace
|
namespace
|
||||||
{
|
{
|
||||||
|
|
||||||
@ -16,9 +19,6 @@ struct EncryptMySQLModeImpl
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
namespace DB
|
|
||||||
{
|
|
||||||
|
|
||||||
REGISTER_FUNCTION(AESEncryptMysql)
|
REGISTER_FUNCTION(AESEncryptMysql)
|
||||||
{
|
{
|
||||||
factory.registerFunction<FunctionEncrypt<EncryptMySQLModeImpl>>();
|
factory.registerFunction<FunctionEncrypt<EncryptMySQLModeImpl>>();
|
||||||
|
@ -5,6 +5,9 @@
|
|||||||
#include <Functions/FunctionFactory.h>
|
#include <Functions/FunctionFactory.h>
|
||||||
#include <Functions/FunctionsAES.h>
|
#include <Functions/FunctionsAES.h>
|
||||||
|
|
||||||
|
namespace DB
|
||||||
|
{
|
||||||
|
|
||||||
namespace
|
namespace
|
||||||
{
|
{
|
||||||
|
|
||||||
@ -17,9 +20,6 @@ struct DecryptImpl
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
namespace DB
|
|
||||||
{
|
|
||||||
|
|
||||||
REGISTER_FUNCTION(Decrypt)
|
REGISTER_FUNCTION(Decrypt)
|
||||||
{
|
{
|
||||||
factory.registerFunction<FunctionDecrypt<DecryptImpl>>();
|
factory.registerFunction<FunctionDecrypt<DecryptImpl>>();
|
||||||
|
@ -5,6 +5,9 @@
|
|||||||
#include <Functions/FunctionFactory.h>
|
#include <Functions/FunctionFactory.h>
|
||||||
#include <Functions/FunctionsAES.h>
|
#include <Functions/FunctionsAES.h>
|
||||||
|
|
||||||
|
namespace DB
|
||||||
|
{
|
||||||
|
|
||||||
namespace
|
namespace
|
||||||
{
|
{
|
||||||
|
|
||||||
@ -16,9 +19,6 @@ struct EncryptImpl
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
namespace DB
|
|
||||||
{
|
|
||||||
|
|
||||||
REGISTER_FUNCTION(Encrypt)
|
REGISTER_FUNCTION(Encrypt)
|
||||||
{
|
{
|
||||||
factory.registerFunction<FunctionEncrypt<EncryptImpl>>();
|
factory.registerFunction<FunctionEncrypt<EncryptImpl>>();
|
||||||
|
@ -3,8 +3,12 @@
|
|||||||
|
|
||||||
#if USE_SSL
|
#if USE_SSL
|
||||||
|
|
||||||
# include <Functions/FunctionFactory.h>
|
#include <Functions/FunctionFactory.h>
|
||||||
# include <Functions/FunctionsAES.h>
|
#include <Functions/FunctionsAES.h>
|
||||||
|
|
||||||
|
|
||||||
|
namespace DB
|
||||||
|
{
|
||||||
|
|
||||||
namespace
|
namespace
|
||||||
{
|
{
|
||||||
@ -18,9 +22,6 @@ struct TryDecryptImpl
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
namespace DB
|
|
||||||
{
|
|
||||||
|
|
||||||
REGISTER_FUNCTION(TryDecrypt)
|
REGISTER_FUNCTION(TryDecrypt)
|
||||||
{
|
{
|
||||||
factory.registerFunction<FunctionDecrypt<TryDecryptImpl>>(FunctionDocumentation{
|
factory.registerFunction<FunctionDecrypt<TryDecryptImpl>>(FunctionDocumentation{
|
||||||
|
@ -97,7 +97,7 @@ namespace
|
|||||||
uint8_t * ciphertext = reinterpret_cast<uint8_t *>(out.position());
|
uint8_t * ciphertext = reinterpret_cast<uint8_t *>(out.position());
|
||||||
int ciphertext_size = 0;
|
int ciphertext_size = 0;
|
||||||
if (!EVP_EncryptUpdate(evp_ctx, ciphertext, &ciphertext_size, &in[in_size], static_cast<int>(part_size)))
|
if (!EVP_EncryptUpdate(evp_ctx, ciphertext, &ciphertext_size, &in[in_size], static_cast<int>(part_size)))
|
||||||
throw Exception(ErrorCodes::DATA_ENCRYPTION_ERROR, "Failed to encrypt");
|
throw Exception(ErrorCodes::DATA_ENCRYPTION_ERROR, "Failed to encrypt: {}", ERR_get_error());
|
||||||
|
|
||||||
in_size += part_size;
|
in_size += part_size;
|
||||||
if (ciphertext_size)
|
if (ciphertext_size)
|
||||||
@ -120,7 +120,7 @@ namespace
|
|||||||
uint8_t ciphertext[kBlockSize];
|
uint8_t ciphertext[kBlockSize];
|
||||||
int ciphertext_size = 0;
|
int ciphertext_size = 0;
|
||||||
if (!EVP_EncryptUpdate(evp_ctx, ciphertext, &ciphertext_size, padded_data, safe_cast<int>(padded_data_size)))
|
if (!EVP_EncryptUpdate(evp_ctx, ciphertext, &ciphertext_size, padded_data, safe_cast<int>(padded_data_size)))
|
||||||
throw Exception(ErrorCodes::DATA_ENCRYPTION_ERROR, "Failed to encrypt");
|
throw Exception(ErrorCodes::DATA_ENCRYPTION_ERROR, "Failed to encrypt: {}", ERR_get_error());
|
||||||
|
|
||||||
if (!ciphertext_size)
|
if (!ciphertext_size)
|
||||||
return 0;
|
return 0;
|
||||||
@ -140,7 +140,7 @@ namespace
|
|||||||
int ciphertext_size = 0;
|
int ciphertext_size = 0;
|
||||||
if (!EVP_EncryptFinal_ex(evp_ctx,
|
if (!EVP_EncryptFinal_ex(evp_ctx,
|
||||||
ciphertext, &ciphertext_size))
|
ciphertext, &ciphertext_size))
|
||||||
throw Exception(ErrorCodes::DATA_ENCRYPTION_ERROR, "Failed to finalize encrypting");
|
throw Exception(ErrorCodes::DATA_ENCRYPTION_ERROR, "Failed to finalize encrypting: {}", ERR_get_error());
|
||||||
if (ciphertext_size)
|
if (ciphertext_size)
|
||||||
out.write(reinterpret_cast<const char *>(ciphertext), ciphertext_size);
|
out.write(reinterpret_cast<const char *>(ciphertext), ciphertext_size);
|
||||||
return ciphertext_size;
|
return ciphertext_size;
|
||||||
@ -152,7 +152,7 @@ namespace
|
|||||||
uint8_t * plaintext = reinterpret_cast<uint8_t *>(out);
|
uint8_t * plaintext = reinterpret_cast<uint8_t *>(out);
|
||||||
int plaintext_size = 0;
|
int plaintext_size = 0;
|
||||||
if (!EVP_DecryptUpdate(evp_ctx, plaintext, &plaintext_size, in, safe_cast<int>(size)))
|
if (!EVP_DecryptUpdate(evp_ctx, plaintext, &plaintext_size, in, safe_cast<int>(size)))
|
||||||
throw Exception(ErrorCodes::DATA_ENCRYPTION_ERROR, "Failed to decrypt");
|
throw Exception(ErrorCodes::DATA_ENCRYPTION_ERROR, "Failed to decrypt: {}", ERR_get_error());
|
||||||
return plaintext_size;
|
return plaintext_size;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -165,7 +165,7 @@ namespace
|
|||||||
uint8_t plaintext[kBlockSize];
|
uint8_t plaintext[kBlockSize];
|
||||||
int plaintext_size = 0;
|
int plaintext_size = 0;
|
||||||
if (!EVP_DecryptUpdate(evp_ctx, plaintext, &plaintext_size, padded_data, safe_cast<int>(padded_data_size)))
|
if (!EVP_DecryptUpdate(evp_ctx, plaintext, &plaintext_size, padded_data, safe_cast<int>(padded_data_size)))
|
||||||
throw Exception(ErrorCodes::DATA_ENCRYPTION_ERROR, "Failed to decrypt");
|
throw Exception(ErrorCodes::DATA_ENCRYPTION_ERROR, "Failed to decrypt: {}", ERR_get_error());
|
||||||
|
|
||||||
if (!plaintext_size)
|
if (!plaintext_size)
|
||||||
return 0;
|
return 0;
|
||||||
@ -184,7 +184,7 @@ namespace
|
|||||||
uint8_t plaintext[kBlockSize];
|
uint8_t plaintext[kBlockSize];
|
||||||
int plaintext_size = 0;
|
int plaintext_size = 0;
|
||||||
if (!EVP_DecryptFinal_ex(evp_ctx, plaintext, &plaintext_size))
|
if (!EVP_DecryptFinal_ex(evp_ctx, plaintext, &plaintext_size))
|
||||||
throw Exception(ErrorCodes::DATA_ENCRYPTION_ERROR, "Failed to finalize decrypting");
|
throw Exception(ErrorCodes::DATA_ENCRYPTION_ERROR, "Failed to finalize decrypting: {}", ERR_get_error());
|
||||||
if (plaintext_size)
|
if (plaintext_size)
|
||||||
memcpy(out, plaintext, plaintext_size);
|
memcpy(out, plaintext, plaintext_size);
|
||||||
return plaintext_size;
|
return plaintext_size;
|
||||||
@ -291,11 +291,11 @@ void Encryptor::encrypt(const char * data, size_t size, WriteBuffer & out)
|
|||||||
auto * evp_ctx = evp_ctx_ptr.get();
|
auto * evp_ctx = evp_ctx_ptr.get();
|
||||||
|
|
||||||
if (!EVP_EncryptInit_ex(evp_ctx, evp_cipher, nullptr, nullptr, nullptr))
|
if (!EVP_EncryptInit_ex(evp_ctx, evp_cipher, nullptr, nullptr, nullptr))
|
||||||
throw Exception(ErrorCodes::DATA_ENCRYPTION_ERROR, "Failed to initialize encryption context with cipher");
|
throw Exception(ErrorCodes::DATA_ENCRYPTION_ERROR, "Failed to initialize encryption context with cipher: {}", ERR_get_error());
|
||||||
|
|
||||||
if (!EVP_EncryptInit_ex(evp_ctx, nullptr, nullptr,
|
if (!EVP_EncryptInit_ex(evp_ctx, nullptr, nullptr,
|
||||||
reinterpret_cast<const uint8_t*>(key.c_str()), reinterpret_cast<const uint8_t*>(current_iv.c_str())))
|
reinterpret_cast<const uint8_t*>(key.c_str()), reinterpret_cast<const uint8_t*>(current_iv.c_str())))
|
||||||
throw Exception(ErrorCodes::DATA_ENCRYPTION_ERROR, "Failed to set key and IV for encryption");
|
throw Exception(ErrorCodes::DATA_ENCRYPTION_ERROR, "Failed to set key and IV for encryption: {}", ERR_get_error());
|
||||||
|
|
||||||
size_t in_size = 0;
|
size_t in_size = 0;
|
||||||
size_t out_size = 0;
|
size_t out_size = 0;
|
||||||
@ -320,7 +320,7 @@ void Encryptor::encrypt(const char * data, size_t size, WriteBuffer & out)
|
|||||||
out_size += encryptFinal(evp_ctx, out);
|
out_size += encryptFinal(evp_ctx, out);
|
||||||
|
|
||||||
if (out_size != in_size)
|
if (out_size != in_size)
|
||||||
throw Exception(ErrorCodes::DATA_ENCRYPTION_ERROR, "Only part of the data was encrypted");
|
throw Exception(ErrorCodes::DATA_ENCRYPTION_ERROR, "Only part of the data was encrypted: {} out of {} bytes", out_size, in_size);
|
||||||
offset += in_size;
|
offset += in_size;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -335,11 +335,11 @@ void Encryptor::decrypt(const char * data, size_t size, char * out)
|
|||||||
auto * evp_ctx = evp_ctx_ptr.get();
|
auto * evp_ctx = evp_ctx_ptr.get();
|
||||||
|
|
||||||
if (!EVP_DecryptInit_ex(evp_ctx, evp_cipher, nullptr, nullptr, nullptr))
|
if (!EVP_DecryptInit_ex(evp_ctx, evp_cipher, nullptr, nullptr, nullptr))
|
||||||
throw Exception(ErrorCodes::DATA_ENCRYPTION_ERROR, "Failed to initialize decryption context with cipher");
|
throw Exception(ErrorCodes::DATA_ENCRYPTION_ERROR, "Failed to initialize decryption context with cipher: {}", ERR_get_error());
|
||||||
|
|
||||||
if (!EVP_DecryptInit_ex(evp_ctx, nullptr, nullptr,
|
if (!EVP_DecryptInit_ex(evp_ctx, nullptr, nullptr,
|
||||||
reinterpret_cast<const uint8_t*>(key.c_str()), reinterpret_cast<const uint8_t*>(current_iv.c_str())))
|
reinterpret_cast<const uint8_t*>(key.c_str()), reinterpret_cast<const uint8_t*>(current_iv.c_str())))
|
||||||
throw Exception(ErrorCodes::DATA_ENCRYPTION_ERROR, "Failed to set key and IV for decryption");
|
throw Exception(ErrorCodes::DATA_ENCRYPTION_ERROR, "Failed to set key and IV for decryption: {}", ERR_get_error());
|
||||||
|
|
||||||
size_t in_size = 0;
|
size_t in_size = 0;
|
||||||
size_t out_size = 0;
|
size_t out_size = 0;
|
||||||
@ -364,7 +364,7 @@ void Encryptor::decrypt(const char * data, size_t size, char * out)
|
|||||||
out_size += decryptFinal(evp_ctx, &out[out_size]);
|
out_size += decryptFinal(evp_ctx, &out[out_size]);
|
||||||
|
|
||||||
if (out_size != in_size)
|
if (out_size != in_size)
|
||||||
throw Exception(ErrorCodes::DATA_ENCRYPTION_ERROR, "Only part of the data was decrypted");
|
throw Exception(ErrorCodes::DATA_ENCRYPTION_ERROR, "Only part of the data was decrypted: {} out of {} bytes", out_size, in_size);
|
||||||
offset += in_size;
|
offset += in_size;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -39,7 +39,6 @@ namespace ErrorCodes
|
|||||||
extern const int CANNOT_READ_FROM_FILE_DESCRIPTOR;
|
extern const int CANNOT_READ_FROM_FILE_DESCRIPTOR;
|
||||||
extern const int ARGUMENT_OUT_OF_BOUND;
|
extern const int ARGUMENT_OUT_OF_BOUND;
|
||||||
extern const int CANNOT_SEEK_THROUGH_FILE;
|
extern const int CANNOT_SEEK_THROUGH_FILE;
|
||||||
extern const int CANNOT_SELECT;
|
|
||||||
extern const int CANNOT_ADVISE;
|
extern const int CANNOT_ADVISE;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -249,24 +248,6 @@ void ReadBufferFromFileDescriptor::rewind()
|
|||||||
file_offset_of_buffer_end = 0;
|
file_offset_of_buffer_end = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/// Assuming file descriptor supports 'select', check that we have data to read or wait until timeout.
|
|
||||||
bool ReadBufferFromFileDescriptor::poll(size_t timeout_microseconds) const
|
|
||||||
{
|
|
||||||
fd_set fds;
|
|
||||||
FD_ZERO(&fds);
|
|
||||||
FD_SET(fd, &fds);
|
|
||||||
timeval timeout = { time_t(timeout_microseconds / 1000000), suseconds_t(timeout_microseconds % 1000000) };
|
|
||||||
|
|
||||||
int res = select(1, &fds, nullptr, nullptr, &timeout);
|
|
||||||
|
|
||||||
if (-1 == res)
|
|
||||||
throwFromErrno("Cannot select", ErrorCodes::CANNOT_SELECT);
|
|
||||||
|
|
||||||
return res > 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
size_t ReadBufferFromFileDescriptor::getFileSize()
|
size_t ReadBufferFromFileDescriptor::getFileSize()
|
||||||
{
|
{
|
||||||
return getSizeFromFileDescriptor(fd, getFileName());
|
return getSizeFromFileDescriptor(fd, getFileName());
|
||||||
|
@ -75,10 +75,6 @@ public:
|
|||||||
|
|
||||||
size_t readBigAt(char * to, size_t n, size_t offset, const std::function<bool(size_t)> &) override;
|
size_t readBigAt(char * to, size_t n, size_t offset, const std::function<bool(size_t)> &) override;
|
||||||
bool supportsReadAt() override { return use_pread; }
|
bool supportsReadAt() override { return use_pread; }
|
||||||
|
|
||||||
private:
|
|
||||||
/// Assuming file descriptor supports 'select', check that we have data to read or wait until timeout.
|
|
||||||
bool poll(size_t timeout_microseconds) const;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
@ -4811,7 +4811,7 @@ void Context::initializeBackgroundExecutorsIfNeeded()
|
|||||||
shared->are_background_executors_initialized = true;
|
shared->are_background_executors_initialized = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool Context::areBackgroundExecutorsInitialized()
|
bool Context::areBackgroundExecutorsInitialized() const
|
||||||
{
|
{
|
||||||
SharedLockGuard lock(shared->background_executors_mutex);
|
SharedLockGuard lock(shared->background_executors_mutex);
|
||||||
return shared->are_background_executors_initialized;
|
return shared->are_background_executors_initialized;
|
||||||
|
@ -1202,7 +1202,7 @@ public:
|
|||||||
|
|
||||||
/// Background executors related methods
|
/// Background executors related methods
|
||||||
void initializeBackgroundExecutorsIfNeeded();
|
void initializeBackgroundExecutorsIfNeeded();
|
||||||
bool areBackgroundExecutorsInitialized();
|
bool areBackgroundExecutorsInitialized() const;
|
||||||
|
|
||||||
MergeMutateBackgroundExecutorPtr getMergeMutateExecutor() const;
|
MergeMutateBackgroundExecutorPtr getMergeMutateExecutor() const;
|
||||||
OrdinaryBackgroundExecutorPtr getMovesExecutor() const;
|
OrdinaryBackgroundExecutorPtr getMovesExecutor() const;
|
||||||
|
@ -5,6 +5,7 @@
|
|||||||
#include <Storages/StorageDistributed.h>
|
#include <Storages/StorageDistributed.h>
|
||||||
#include <QueryPipeline/RemoteInserter.h>
|
#include <QueryPipeline/RemoteInserter.h>
|
||||||
#include <Common/CurrentMetrics.h>
|
#include <Common/CurrentMetrics.h>
|
||||||
|
#include <base/defines.h>
|
||||||
#include <IO/Operators.h>
|
#include <IO/Operators.h>
|
||||||
#include <IO/WriteBufferFromFile.h>
|
#include <IO/WriteBufferFromFile.h>
|
||||||
|
|
||||||
@ -163,6 +164,22 @@ void DistributedAsyncInsertBatch::deserialize()
|
|||||||
readText(in);
|
readText(in);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool DistributedAsyncInsertBatch::valid()
|
||||||
|
{
|
||||||
|
chassert(!files.empty());
|
||||||
|
|
||||||
|
bool res = true;
|
||||||
|
for (const auto & file : files)
|
||||||
|
{
|
||||||
|
if (!fs::exists(file))
|
||||||
|
{
|
||||||
|
LOG_WARNING(parent.log, "File {} does not exists, likely due abnormal shutdown", file);
|
||||||
|
res = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
||||||
void DistributedAsyncInsertBatch::writeText(WriteBuffer & out)
|
void DistributedAsyncInsertBatch::writeText(WriteBuffer & out)
|
||||||
{
|
{
|
||||||
for (const auto & file : files)
|
for (const auto & file : files)
|
||||||
@ -201,14 +218,6 @@ void DistributedAsyncInsertBatch::sendBatch()
|
|||||||
{
|
{
|
||||||
for (const auto & file : files)
|
for (const auto & file : files)
|
||||||
{
|
{
|
||||||
/// In case of recovery it is possible that some of files will be
|
|
||||||
/// missing, if server had been restarted abnormally
|
|
||||||
if (recovered && !fs::exists(file))
|
|
||||||
{
|
|
||||||
LOG_WARNING(parent.log, "File {} does not exists, likely due abnormal shutdown", file);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
ReadBufferFromFile in(file);
|
ReadBufferFromFile in(file);
|
||||||
const auto & distributed_header = DistributedAsyncInsertHeader::read(in, parent.log);
|
const auto & distributed_header = DistributedAsyncInsertHeader::read(in, parent.log);
|
||||||
|
|
||||||
|
@ -18,9 +18,16 @@ public:
|
|||||||
bool isEnoughSize() const;
|
bool isEnoughSize() const;
|
||||||
void send();
|
void send();
|
||||||
|
|
||||||
|
/// Write batch to current_batch.txt
|
||||||
void serialize();
|
void serialize();
|
||||||
|
|
||||||
|
/// Read batch from current_batch.txt
|
||||||
void deserialize();
|
void deserialize();
|
||||||
|
|
||||||
|
/// Does all required files exists?
|
||||||
|
/// (The only way variant when it is valid is during restoring batch from disk).
|
||||||
|
bool valid();
|
||||||
|
|
||||||
size_t total_rows = 0;
|
size_t total_rows = 0;
|
||||||
size_t total_bytes = 0;
|
size_t total_bytes = 0;
|
||||||
std::vector<std::string> files;
|
std::vector<std::string> files;
|
||||||
|
@ -16,13 +16,14 @@
|
|||||||
#include <Common/StringUtils/StringUtils.h>
|
#include <Common/StringUtils/StringUtils.h>
|
||||||
#include <Common/SipHash.h>
|
#include <Common/SipHash.h>
|
||||||
#include <Common/quoteString.h>
|
#include <Common/quoteString.h>
|
||||||
#include <base/hex.h>
|
#include <Common/ProfileEvents.h>
|
||||||
#include <Common/ActionBlocker.h>
|
#include <Common/ActionBlocker.h>
|
||||||
#include <Common/formatReadable.h>
|
#include <Common/formatReadable.h>
|
||||||
#include <Common/Stopwatch.h>
|
#include <Common/Stopwatch.h>
|
||||||
#include <Common/logger_useful.h>
|
#include <Common/logger_useful.h>
|
||||||
#include <Compression/CheckingCompressedReadBuffer.h>
|
#include <Compression/CheckingCompressedReadBuffer.h>
|
||||||
#include <IO/Operators.h>
|
#include <IO/Operators.h>
|
||||||
|
#include <base/hex.h>
|
||||||
#include <boost/algorithm/string/find_iterator.hpp>
|
#include <boost/algorithm/string/find_iterator.hpp>
|
||||||
#include <boost/algorithm/string/finder.hpp>
|
#include <boost/algorithm/string/finder.hpp>
|
||||||
#include <boost/range/adaptor/indexed.hpp>
|
#include <boost/range/adaptor/indexed.hpp>
|
||||||
@ -38,6 +39,11 @@ namespace CurrentMetrics
|
|||||||
extern const Metric BrokenDistributedBytesToInsert;
|
extern const Metric BrokenDistributedBytesToInsert;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
namespace ProfileEvents
|
||||||
|
{
|
||||||
|
extern const Event DistributedAsyncInsertionFailures;
|
||||||
|
}
|
||||||
|
|
||||||
namespace fs = std::filesystem;
|
namespace fs = std::filesystem;
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
@ -195,6 +201,15 @@ void DistributedAsyncInsertDirectoryQueue::run()
|
|||||||
/// No errors while processing existing files.
|
/// No errors while processing existing files.
|
||||||
/// Let's see maybe there are more files to process.
|
/// Let's see maybe there are more files to process.
|
||||||
do_sleep = false;
|
do_sleep = false;
|
||||||
|
|
||||||
|
const auto now = std::chrono::system_clock::now();
|
||||||
|
if (now - last_decrease_time > decrease_error_count_period)
|
||||||
|
{
|
||||||
|
std::lock_guard status_lock(status_mutex);
|
||||||
|
|
||||||
|
status.error_count /= 2;
|
||||||
|
last_decrease_time = now;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
catch (...)
|
catch (...)
|
||||||
{
|
{
|
||||||
@ -213,15 +228,6 @@ void DistributedAsyncInsertDirectoryQueue::run()
|
|||||||
else
|
else
|
||||||
LOG_TEST(LogFrequencyLimiter(log, 30), "Skipping send data over distributed table.");
|
LOG_TEST(LogFrequencyLimiter(log, 30), "Skipping send data over distributed table.");
|
||||||
|
|
||||||
const auto now = std::chrono::system_clock::now();
|
|
||||||
if (now - last_decrease_time > decrease_error_count_period)
|
|
||||||
{
|
|
||||||
std::lock_guard status_lock(status_mutex);
|
|
||||||
|
|
||||||
status.error_count /= 2;
|
|
||||||
last_decrease_time = now;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (do_sleep)
|
if (do_sleep)
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@ -376,6 +382,8 @@ try
|
|||||||
}
|
}
|
||||||
catch (...)
|
catch (...)
|
||||||
{
|
{
|
||||||
|
ProfileEvents::increment(ProfileEvents::DistributedAsyncInsertionFailures);
|
||||||
|
|
||||||
std::lock_guard status_lock(status_mutex);
|
std::lock_guard status_lock(status_mutex);
|
||||||
|
|
||||||
++status.error_count;
|
++status.error_count;
|
||||||
@ -516,6 +524,15 @@ void DistributedAsyncInsertDirectoryQueue::processFilesWithBatching()
|
|||||||
|
|
||||||
DistributedAsyncInsertBatch batch(*this);
|
DistributedAsyncInsertBatch batch(*this);
|
||||||
batch.deserialize();
|
batch.deserialize();
|
||||||
|
|
||||||
|
/// In case of recovery it is possible that some of files will be
|
||||||
|
/// missing, if server had been restarted abnormally
|
||||||
|
/// (between unlink(*.bin) and unlink(current_batch.txt)).
|
||||||
|
///
|
||||||
|
/// But current_batch_file_path should be removed anyway, since if some
|
||||||
|
/// file was missing, then the batch is not complete and there is no
|
||||||
|
/// point in trying to pretend that it will not break deduplication.
|
||||||
|
if (batch.valid())
|
||||||
batch.send();
|
batch.send();
|
||||||
|
|
||||||
auto dir_sync_guard = getDirectorySyncGuard(relative_path);
|
auto dir_sync_guard = getDirectorySyncGuard(relative_path);
|
||||||
|
@ -46,17 +46,20 @@ void StorageSystemServerSettings::fillData(MutableColumns & res_columns, Context
|
|||||||
{"max_concurrent_insert_queries", std::to_string(context->getProcessList().getMaxInsertQueriesAmount())},
|
{"max_concurrent_insert_queries", std::to_string(context->getProcessList().getMaxInsertQueriesAmount())},
|
||||||
{"max_concurrent_select_queries", std::to_string(context->getProcessList().getMaxSelectQueriesAmount())},
|
{"max_concurrent_select_queries", std::to_string(context->getProcessList().getMaxSelectQueriesAmount())},
|
||||||
|
|
||||||
{"background_pool_size", std::to_string(context->getMergeMutateExecutor()->getMaxThreads())},
|
|
||||||
{"background_move_pool_size", std::to_string(context->getMovesExecutor()->getMaxThreads())},
|
|
||||||
{"background_fetches_pool_size", std::to_string(context->getFetchesExecutor()->getMaxThreads())},
|
|
||||||
{"background_common_pool_size", std::to_string(context->getCommonExecutor()->getMaxThreads())},
|
|
||||||
|
|
||||||
{"background_buffer_flush_schedule_pool_size", std::to_string(CurrentMetrics::get(CurrentMetrics::BackgroundBufferFlushSchedulePoolSize))},
|
{"background_buffer_flush_schedule_pool_size", std::to_string(CurrentMetrics::get(CurrentMetrics::BackgroundBufferFlushSchedulePoolSize))},
|
||||||
{"background_schedule_pool_size", std::to_string(CurrentMetrics::get(CurrentMetrics::BackgroundSchedulePoolSize))},
|
{"background_schedule_pool_size", std::to_string(CurrentMetrics::get(CurrentMetrics::BackgroundSchedulePoolSize))},
|
||||||
{"background_message_broker_schedule_pool_size", std::to_string(CurrentMetrics::get(CurrentMetrics::BackgroundMessageBrokerSchedulePoolSize))},
|
{"background_message_broker_schedule_pool_size", std::to_string(CurrentMetrics::get(CurrentMetrics::BackgroundMessageBrokerSchedulePoolSize))},
|
||||||
{"background_distributed_schedule_pool_size", std::to_string(CurrentMetrics::get(CurrentMetrics::BackgroundDistributedSchedulePoolSize))}
|
{"background_distributed_schedule_pool_size", std::to_string(CurrentMetrics::get(CurrentMetrics::BackgroundDistributedSchedulePoolSize))}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
if (context->areBackgroundExecutorsInitialized())
|
||||||
|
{
|
||||||
|
updated.insert({"background_pool_size", std::to_string(context->getMergeMutateExecutor()->getMaxThreads())});
|
||||||
|
updated.insert({"background_move_pool_size", std::to_string(context->getMovesExecutor()->getMaxThreads())});
|
||||||
|
updated.insert({"background_fetches_pool_size", std::to_string(context->getFetchesExecutor()->getMaxThreads())});
|
||||||
|
updated.insert({"background_common_pool_size", std::to_string(context->getCommonExecutor()->getMaxThreads())});
|
||||||
|
}
|
||||||
|
|
||||||
const auto & config = context->getConfigRef();
|
const auto & config = context->getConfigRef();
|
||||||
ServerSettings settings;
|
ServerSettings settings;
|
||||||
settings.loadSettingsFromConfig(config);
|
settings.loadSettingsFromConfig(config);
|
||||||
|
@ -57,12 +57,13 @@ std::atomic<bool> signal_latch = false; /// Only need for thread sanitizer.
|
|||||||
|
|
||||||
/** Notes:
|
/** Notes:
|
||||||
* Only one query from the table can be processed at the moment of time.
|
* Only one query from the table can be processed at the moment of time.
|
||||||
* This is ensured by the mutex in fillData function.
|
* This is ensured by the mutex in StorageSystemStackTraceSource.
|
||||||
* We obtain information about threads by sending signal and receiving info from the signal handler.
|
* We obtain information about threads by sending signal and receiving info from the signal handler.
|
||||||
* Information is passed via global variables and pipe is used for signaling.
|
* Information is passed via global variables and pipe is used for signaling.
|
||||||
* Actually we can send all information via pipe, but we read from it with timeout just in case,
|
* Actually we can send all information via pipe, but we read from it with timeout just in case,
|
||||||
* so it's convenient to use is only for signaling.
|
* so it's convenient to use it only for signaling.
|
||||||
*/
|
*/
|
||||||
|
std::mutex mutex;
|
||||||
|
|
||||||
StackTrace stack_trace{NoCapture{}};
|
StackTrace stack_trace{NoCapture{}};
|
||||||
|
|
||||||
@ -189,7 +190,7 @@ ThreadIdToName getFilteredThreadNames(ASTPtr query, ContextPtr context, const Pa
|
|||||||
tid_to_name[tid] = thread_name;
|
tid_to_name[tid] = thread_name;
|
||||||
all_thread_names->insert(thread_name);
|
all_thread_names->insert(thread_name);
|
||||||
}
|
}
|
||||||
LOG_TEST(log, "Read {} thread names for {} threads, took {} ms", tid_to_name.size(), thread_ids.size(), watch.elapsedMilliseconds());
|
LOG_TRACE(log, "Read {} thread names for {} threads, took {} ms", tid_to_name.size(), thread_ids.size(), watch.elapsedMilliseconds());
|
||||||
|
|
||||||
Block block { ColumnWithTypeAndName(std::move(all_thread_names), std::make_shared<DataTypeString>(), "thread_name") };
|
Block block { ColumnWithTypeAndName(std::move(all_thread_names), std::make_shared<DataTypeString>(), "thread_name") };
|
||||||
VirtualColumnUtils::filterBlockWithQuery(query, block, context);
|
VirtualColumnUtils::filterBlockWithQuery(query, block, context);
|
||||||
@ -229,6 +230,8 @@ public:
|
|||||||
, pipe_read_timeout_ms(static_cast<int>(context->getSettingsRef().storage_system_stack_trace_pipe_read_timeout_ms.totalMilliseconds()))
|
, pipe_read_timeout_ms(static_cast<int>(context->getSettingsRef().storage_system_stack_trace_pipe_read_timeout_ms.totalMilliseconds()))
|
||||||
, log(log_)
|
, log(log_)
|
||||||
, proc_it("/proc/self/task")
|
, proc_it("/proc/self/task")
|
||||||
|
/// It shouldn't be possible to do concurrent reads from this table.
|
||||||
|
, lock(mutex)
|
||||||
{
|
{
|
||||||
/// Create a mask of what columns are needed in the result.
|
/// Create a mask of what columns are needed in the result.
|
||||||
NameSet names_set(column_names.begin(), column_names.end());
|
NameSet names_set(column_names.begin(), column_names.end());
|
||||||
@ -241,16 +244,13 @@ public:
|
|||||||
protected:
|
protected:
|
||||||
Chunk generate() override
|
Chunk generate() override
|
||||||
{
|
{
|
||||||
/// It shouldn't be possible to do concurrent reads from this table.
|
|
||||||
std::lock_guard lock(mutex);
|
|
||||||
|
|
||||||
MutableColumns res_columns = header.cloneEmptyColumns();
|
MutableColumns res_columns = header.cloneEmptyColumns();
|
||||||
|
|
||||||
ColumnPtr thread_ids;
|
ColumnPtr thread_ids;
|
||||||
{
|
{
|
||||||
Stopwatch watch;
|
Stopwatch watch;
|
||||||
thread_ids = getFilteredThreadIds();
|
thread_ids = getFilteredThreadIds();
|
||||||
LOG_TEST(log, "Read {} threads, took {} ms", thread_ids->size(), watch.elapsedMilliseconds());
|
LOG_TRACE(log, "Read {} threads, took {} ms", thread_ids->size(), watch.elapsedMilliseconds());
|
||||||
}
|
}
|
||||||
if (thread_ids->empty())
|
if (thread_ids->empty())
|
||||||
return Chunk();
|
return Chunk();
|
||||||
@ -332,7 +332,7 @@ protected:
|
|||||||
++sequence_num;
|
++sequence_num;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
LOG_TEST(log, "Send signal to {} threads (total), took {} ms", signals_sent, signals_sent_ms);
|
LOG_TRACE(log, "Send signal to {} threads (total), took {} ms", signals_sent, signals_sent_ms);
|
||||||
|
|
||||||
UInt64 num_rows = res_columns.at(0)->size();
|
UInt64 num_rows = res_columns.at(0)->size();
|
||||||
Chunk chunk(std::move(res_columns), num_rows);
|
Chunk chunk(std::move(res_columns), num_rows);
|
||||||
@ -357,7 +357,7 @@ private:
|
|||||||
size_t signals_sent = 0;
|
size_t signals_sent = 0;
|
||||||
size_t signals_sent_ms = 0;
|
size_t signals_sent_ms = 0;
|
||||||
|
|
||||||
std::mutex mutex;
|
std::unique_lock<std::mutex> lock;
|
||||||
|
|
||||||
ColumnPtr getFilteredThreadIds()
|
ColumnPtr getFilteredThreadIds()
|
||||||
{
|
{
|
||||||
|
@ -208,6 +208,13 @@ CI_CONFIG = CiConfig(
|
|||||||
static_binary_name="amd64compat",
|
static_binary_name="amd64compat",
|
||||||
comment="SSE2-only build",
|
comment="SSE2-only build",
|
||||||
),
|
),
|
||||||
|
"binary_amd64_musl": BuildConfig(
|
||||||
|
name="binary_amd64_musl",
|
||||||
|
compiler="clang-17-amd64-musl",
|
||||||
|
package_type="binary",
|
||||||
|
static_binary_name="amd64musl",
|
||||||
|
comment="Build with Musl",
|
||||||
|
),
|
||||||
"binary_riscv64": BuildConfig(
|
"binary_riscv64": BuildConfig(
|
||||||
name="binary_riscv64",
|
name="binary_riscv64",
|
||||||
compiler="clang-17-riscv64",
|
compiler="clang-17-riscv64",
|
||||||
@ -249,6 +256,7 @@ CI_CONFIG = CiConfig(
|
|||||||
"binary_riscv64",
|
"binary_riscv64",
|
||||||
"binary_s390x",
|
"binary_s390x",
|
||||||
"binary_amd64_compat",
|
"binary_amd64_compat",
|
||||||
|
"binary_amd64_musl",
|
||||||
],
|
],
|
||||||
},
|
},
|
||||||
test_configs={
|
test_configs={
|
||||||
|
@ -2205,7 +2205,7 @@ def reportLogStats(args):
|
|||||||
GROUP BY message_format_string
|
GROUP BY message_format_string
|
||||||
ORDER BY count DESC
|
ORDER BY count DESC
|
||||||
LIMIT 100
|
LIMIT 100
|
||||||
FORMAT TSVWithNamesAndTypes
|
FORMAT PrettySpaceNoEscapes
|
||||||
"""
|
"""
|
||||||
value = clickhouse_execute(args, query).decode(errors="replace")
|
value = clickhouse_execute(args, query).decode(errors="replace")
|
||||||
print("\nTop patterns of log messages:\n")
|
print("\nTop patterns of log messages:\n")
|
||||||
@ -2225,7 +2225,7 @@ def reportLogStats(args):
|
|||||||
GROUP BY pattern
|
GROUP BY pattern
|
||||||
ORDER BY count DESC
|
ORDER BY count DESC
|
||||||
LIMIT 30
|
LIMIT 30
|
||||||
FORMAT TSVWithNamesAndTypes
|
FORMAT PrettySpaceNoEscapes
|
||||||
"""
|
"""
|
||||||
value = clickhouse_execute(args, query).decode(errors="replace")
|
value = clickhouse_execute(args, query).decode(errors="replace")
|
||||||
print("\nTop messages without format string (fmt::runtime):\n")
|
print("\nTop messages without format string (fmt::runtime):\n")
|
||||||
@ -2238,10 +2238,10 @@ def reportLogStats(args):
|
|||||||
WHERE (now() - toIntervalMinute(240)) < event_time
|
WHERE (now() - toIntervalMinute(240)) < event_time
|
||||||
AND (message NOT LIKE (replaceRegexpAll(message_format_string, '{[:.0-9dfx]*}', '%') AS s))
|
AND (message NOT LIKE (replaceRegexpAll(message_format_string, '{[:.0-9dfx]*}', '%') AS s))
|
||||||
AND (message NOT LIKE concat('%Exception: ', s, '%'))
|
AND (message NOT LIKE concat('%Exception: ', s, '%'))
|
||||||
GROUP BY message_format_string ORDER BY count() DESC LIMIT 20 FORMAT TSVWithNamesAndTypes
|
GROUP BY message_format_string ORDER BY count() DESC LIMIT 20 FORMAT PrettySpaceNoEscapes
|
||||||
"""
|
"""
|
||||||
value = clickhouse_execute(args, query).decode(errors="replace")
|
value = clickhouse_execute(args, query).decode(errors="replace")
|
||||||
print("\nTop messages that does not match its format string:\n")
|
print("\nTop messages not matching their format strings:\n")
|
||||||
print(value)
|
print(value)
|
||||||
print("\n")
|
print("\n")
|
||||||
|
|
||||||
@ -2269,13 +2269,13 @@ def reportLogStats(args):
|
|||||||
'Attempt to read after eof', 'String size is too big ({}), maximum: {}'
|
'Attempt to read after eof', 'String size is too big ({}), maximum: {}'
|
||||||
) AS known_short_messages
|
) AS known_short_messages
|
||||||
SELECT count() AS c, message_format_string, substr(any(message), 1, 120),
|
SELECT count() AS c, message_format_string, substr(any(message), 1, 120),
|
||||||
min(if(length(regexpExtract(message, '(.*)\\([A-Z0-9_]+\\)')) as pref > 0, pref, length(message)) - 26 AS length_without_exception_boilerplate) AS min_length_without_exception_boilerplate
|
min(if(notEmpty(regexpExtract(message, '(.*)\\([A-Z0-9_]+\\)') as prefix), prefix, length(message)) - 26 AS length_without_exception_boilerplate) AS min_length_without_exception_boilerplate
|
||||||
FROM system.text_log
|
FROM system.text_log
|
||||||
WHERE (now() - toIntervalMinute(240)) < event_time
|
WHERE (now() - toIntervalMinute(240)) < event_time
|
||||||
AND (length(message_format_string) < 16
|
AND (length(message_format_string) < 16
|
||||||
OR (message ilike '%DB::Exception%' AND length_without_exception_boilerplate < 30))
|
OR (message ILIKE '%DB::Exception%' AND length_without_exception_boilerplate < 30))
|
||||||
AND message_format_string NOT IN known_short_messages
|
AND message_format_string NOT IN known_short_messages
|
||||||
GROUP BY message_format_string ORDER BY c DESC LIMIT 50 FORMAT TSVWithNamesAndTypes
|
GROUP BY message_format_string ORDER BY c DESC LIMIT 50 FORMAT PrettySpaceNoEscapes
|
||||||
"""
|
"""
|
||||||
value = clickhouse_execute(args, query).decode(errors="replace")
|
value = clickhouse_execute(args, query).decode(errors="replace")
|
||||||
print("\nTop short messages:\n")
|
print("\nTop short messages:\n")
|
||||||
|
@ -193,11 +193,11 @@ class PostgresManager:
|
|||||||
database_name = self.database_or_default(database_name)
|
database_name = self.database_or_default(database_name)
|
||||||
self.drop_postgres_db(database_name)
|
self.drop_postgres_db(database_name)
|
||||||
self.created_postgres_db_list.add(database_name)
|
self.created_postgres_db_list.add(database_name)
|
||||||
self.cursor.execute(f"CREATE DATABASE {database_name}")
|
self.cursor.execute(f'CREATE DATABASE "{database_name}"')
|
||||||
|
|
||||||
def drop_postgres_db(self, database_name=""):
|
def drop_postgres_db(self, database_name=""):
|
||||||
database_name = self.database_or_default(database_name)
|
database_name = self.database_or_default(database_name)
|
||||||
self.cursor.execute(f"DROP DATABASE IF EXISTS {database_name} WITH (FORCE)")
|
self.cursor.execute(f'DROP DATABASE IF EXISTS "{database_name}" WITH (FORCE)')
|
||||||
if database_name in self.created_postgres_db_list:
|
if database_name in self.created_postgres_db_list:
|
||||||
self.created_postgres_db_list.remove(database_name)
|
self.created_postgres_db_list.remove(database_name)
|
||||||
|
|
||||||
@ -216,19 +216,19 @@ class PostgresManager:
|
|||||||
if len(schema_name) == 0:
|
if len(schema_name) == 0:
|
||||||
self.instance.query(
|
self.instance.query(
|
||||||
f"""
|
f"""
|
||||||
CREATE DATABASE {database_name}
|
CREATE DATABASE \"{database_name}\"
|
||||||
ENGINE = PostgreSQL('{self.ip}:{self.port}', '{postgres_database}', 'postgres', 'mysecretpassword')"""
|
ENGINE = PostgreSQL('{self.ip}:{self.port}', '{postgres_database}', 'postgres', 'mysecretpassword')"""
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
self.instance.query(
|
self.instance.query(
|
||||||
f"""
|
f"""
|
||||||
CREATE DATABASE {database_name}
|
CREATE DATABASE \"{database_name}\"
|
||||||
ENGINE = PostgreSQL('{self.ip}:{self.port}', '{postgres_database}', 'postgres', 'mysecretpassword', '{schema_name}')"""
|
ENGINE = PostgreSQL('{self.ip}:{self.port}', '{postgres_database}', 'postgres', 'mysecretpassword', '{schema_name}')"""
|
||||||
)
|
)
|
||||||
|
|
||||||
def drop_clickhouse_postgres_db(self, database_name=""):
|
def drop_clickhouse_postgres_db(self, database_name=""):
|
||||||
database_name = self.database_or_default(database_name)
|
database_name = self.database_or_default(database_name)
|
||||||
self.instance.query(f"DROP DATABASE IF EXISTS {database_name}")
|
self.instance.query(f'DROP DATABASE IF EXISTS "{database_name}"')
|
||||||
if database_name in self.created_ch_postgres_db_list:
|
if database_name in self.created_ch_postgres_db_list:
|
||||||
self.created_ch_postgres_db_list.remove(database_name)
|
self.created_ch_postgres_db_list.remove(database_name)
|
||||||
|
|
||||||
@ -368,7 +368,7 @@ def check_tables_are_synchronized(
|
|||||||
result_query = f"select * from {table_path} order by {order_by};"
|
result_query = f"select * from {table_path} order by {order_by};"
|
||||||
|
|
||||||
expected = instance.query(
|
expected = instance.query(
|
||||||
f"select * from {postgres_database}.{table_name} order by {order_by};"
|
f"select * from `{postgres_database}`.`{table_name}` order by {order_by};"
|
||||||
)
|
)
|
||||||
result = instance.query(result_query)
|
result = instance.query(result_query)
|
||||||
|
|
||||||
@ -382,7 +382,7 @@ def check_tables_are_synchronized(
|
|||||||
if result != expected:
|
if result != expected:
|
||||||
count = int(instance.query(f"select count() from {table_path}"))
|
count = int(instance.query(f"select count() from {table_path}"))
|
||||||
expected_count = int(
|
expected_count = int(
|
||||||
instance.query(f"select count() from {postgres_database}.{table_name}")
|
instance.query(f"select count() from `{postgres_database}`.`{table_name}`")
|
||||||
)
|
)
|
||||||
print(f"Having {count}, expected {expected_count}")
|
print(f"Having {count}, expected {expected_count}")
|
||||||
assert result == expected
|
assert result == expected
|
||||||
|
@ -59,6 +59,7 @@ instance2 = cluster.add_instance(
|
|||||||
pg_manager = PostgresManager()
|
pg_manager = PostgresManager()
|
||||||
pg_manager2 = PostgresManager()
|
pg_manager2 = PostgresManager()
|
||||||
pg_manager_instance2 = PostgresManager()
|
pg_manager_instance2 = PostgresManager()
|
||||||
|
pg_manager3 = PostgresManager()
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(scope="module")
|
@pytest.fixture(scope="module")
|
||||||
@ -81,6 +82,12 @@ def started_cluster():
|
|||||||
pg_manager2.init(
|
pg_manager2.init(
|
||||||
instance2, cluster.postgres_ip, cluster.postgres_port, "postgres_database2"
|
instance2, cluster.postgres_ip, cluster.postgres_port, "postgres_database2"
|
||||||
)
|
)
|
||||||
|
pg_manager3.init(
|
||||||
|
instance,
|
||||||
|
cluster.postgres_ip,
|
||||||
|
cluster.postgres_port,
|
||||||
|
default_database="postgres-postgres",
|
||||||
|
)
|
||||||
|
|
||||||
yield cluster
|
yield cluster
|
||||||
|
|
||||||
@ -915,6 +922,28 @@ def test_failed_load_from_snapshot(started_cluster):
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_symbols_in_publication_name(started_cluster):
|
||||||
|
table = "test_symbols_in_publication_name"
|
||||||
|
|
||||||
|
pg_manager3.create_postgres_table(table)
|
||||||
|
instance.query(
|
||||||
|
f"INSERT INTO `{pg_manager3.get_default_database()}`.`{table}` SELECT number, number from numbers(0, 50)"
|
||||||
|
)
|
||||||
|
|
||||||
|
pg_manager3.create_materialized_db(
|
||||||
|
ip=started_cluster.postgres_ip,
|
||||||
|
port=started_cluster.postgres_port,
|
||||||
|
settings=[
|
||||||
|
f"materialized_postgresql_tables_list = '{table}'",
|
||||||
|
"materialized_postgresql_backoff_min_ms = 100",
|
||||||
|
"materialized_postgresql_backoff_max_ms = 100",
|
||||||
|
],
|
||||||
|
)
|
||||||
|
check_tables_are_synchronized(
|
||||||
|
instance, table, postgres_database=pg_manager3.get_default_database()
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
cluster.start()
|
cluster.start()
|
||||||
input("Cluster created, press any key to destroy...")
|
input("Cluster created, press any key to destroy...")
|
||||||
|
@ -1,18 +0,0 @@
|
|||||||
-- { echo }
|
|
||||||
SELECT count() > 0 FROM system.stack_trace WHERE query_id != '' AND thread_name = 'TCPHandler';
|
|
||||||
1
|
|
||||||
-- opimization for not reading /proc/self/task/{}/comm and avoid sending signal
|
|
||||||
SELECT countIf(thread_id > 0) > 0 FROM system.stack_trace;
|
|
||||||
1
|
|
||||||
-- optimization for trace
|
|
||||||
SELECT length(trace) > 0 FROM system.stack_trace WHERE length(trace) > 0 LIMIT 1;
|
|
||||||
1
|
|
||||||
-- optimization for query_id
|
|
||||||
SELECT length(query_id) > 0 FROM system.stack_trace WHERE query_id != '' AND thread_name = 'TCPHandler' LIMIT 1;
|
|
||||||
1
|
|
||||||
-- optimization for thread_name
|
|
||||||
SELECT length(thread_name) > 0 FROM system.stack_trace WHERE thread_name != '' LIMIT 1;
|
|
||||||
1
|
|
||||||
-- enough rows (optimizations works "correctly")
|
|
||||||
SELECT count() > 100 FROM system.stack_trace;
|
|
||||||
1
|
|
@ -1,24 +0,0 @@
|
|||||||
-- Tags: no-parallel
|
|
||||||
-- Tag no-parallel: to decrease failure probability of collecting stack traces
|
|
||||||
|
|
||||||
-- Process one thread at a time
|
|
||||||
SET max_block_size = 1;
|
|
||||||
|
|
||||||
-- It is OK to have bigger timeout here since:
|
|
||||||
-- a) this test is marked as no-parallel
|
|
||||||
-- b) there is a filter by thread_name, so it will send signals only to the threads with the name TCPHandler
|
|
||||||
-- c) max_block_size is 1
|
|
||||||
SET storage_system_stack_trace_pipe_read_timeout_ms = 5000;
|
|
||||||
|
|
||||||
-- { echo }
|
|
||||||
SELECT count() > 0 FROM system.stack_trace WHERE query_id != '' AND thread_name = 'TCPHandler';
|
|
||||||
-- opimization for not reading /proc/self/task/{}/comm and avoid sending signal
|
|
||||||
SELECT countIf(thread_id > 0) > 0 FROM system.stack_trace;
|
|
||||||
-- optimization for trace
|
|
||||||
SELECT length(trace) > 0 FROM system.stack_trace WHERE length(trace) > 0 LIMIT 1;
|
|
||||||
-- optimization for query_id
|
|
||||||
SELECT length(query_id) > 0 FROM system.stack_trace WHERE query_id != '' AND thread_name = 'TCPHandler' LIMIT 1;
|
|
||||||
-- optimization for thread_name
|
|
||||||
SELECT length(thread_name) > 0 FROM system.stack_trace WHERE thread_name != '' LIMIT 1;
|
|
||||||
-- enough rows (optimizations works "correctly")
|
|
||||||
SELECT count() > 100 FROM system.stack_trace;
|
|
@ -1,4 +0,0 @@
|
|||||||
1
|
|
||||||
1
|
|
||||||
1
|
|
||||||
1
|
|
@ -1,40 +0,0 @@
|
|||||||
#!/usr/bin/env bash
|
|
||||||
|
|
||||||
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
|
||||||
# shellcheck source=../shell_config.sh
|
|
||||||
. "$CURDIR"/../shell_config.sh
|
|
||||||
|
|
||||||
# NOTE: database = $CLICKHOUSE_DATABASE is unwanted
|
|
||||||
verify_sql="SELECT
|
|
||||||
(SELECT sumIf(value, metric = 'PartsActive'), sumIf(value, metric = 'PartsOutdated') FROM system.metrics)
|
|
||||||
= (SELECT sum(active), sum(NOT active) FROM
|
|
||||||
(SELECT active FROM system.parts UNION ALL SELECT active FROM system.projection_parts))"
|
|
||||||
|
|
||||||
# The query is not atomic - it can compare states between system.parts and system.metrics from different points in time.
|
|
||||||
# So, there is inherent race condition. But it should get expected result eventually.
|
|
||||||
# In case of test failure, this code will do infinite loop and timeout.
|
|
||||||
verify()
|
|
||||||
{
|
|
||||||
while true
|
|
||||||
do
|
|
||||||
result=$( $CLICKHOUSE_CLIENT -m --query="$verify_sql" )
|
|
||||||
[ "$result" = "1" ] && break
|
|
||||||
sleep 0.1
|
|
||||||
done
|
|
||||||
echo 1
|
|
||||||
}
|
|
||||||
|
|
||||||
$CLICKHOUSE_CLIENT --database_atomic_wait_for_drop_and_detach_synchronously=1 --query="DROP TABLE IF EXISTS test_table"
|
|
||||||
$CLICKHOUSE_CLIENT --query="CREATE TABLE test_table(data Date) ENGINE = MergeTree PARTITION BY toYear(data) ORDER BY data;"
|
|
||||||
|
|
||||||
$CLICKHOUSE_CLIENT --query="INSERT INTO test_table VALUES ('1992-01-01')"
|
|
||||||
verify
|
|
||||||
|
|
||||||
$CLICKHOUSE_CLIENT --query="INSERT INTO test_table VALUES ('1992-01-02')"
|
|
||||||
verify
|
|
||||||
|
|
||||||
$CLICKHOUSE_CLIENT --query="OPTIMIZE TABLE test_table FINAL"
|
|
||||||
verify
|
|
||||||
|
|
||||||
$CLICKHOUSE_CLIENT --database_atomic_wait_for_drop_and_detach_synchronously=1 --query="DROP TABLE test_table"
|
|
||||||
verify
|
|
@ -1,46 +0,0 @@
|
|||||||
#!/usr/bin/env bash
|
|
||||||
# Tags: no-parallel, long, no-random-settings
|
|
||||||
|
|
||||||
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
|
||||||
# shellcheck source=../shell_config.sh
|
|
||||||
. "$CUR_DIR"/../shell_config.sh
|
|
||||||
|
|
||||||
|
|
||||||
total_iterations=16
|
|
||||||
parallelism=32
|
|
||||||
|
|
||||||
$CLICKHOUSE_CLIENT --query='DROP TABLE IF EXISTS test_inserts'
|
|
||||||
$CLICKHOUSE_CLIENT --query='CREATE TABLE test_inserts ENGINE=Null AS system.numbers'
|
|
||||||
|
|
||||||
run_query() {
|
|
||||||
( $CLICKHOUSE_CLIENT --query='SELECT * FROM numbers_mt(1000000) FORMAT CSV' | $CLICKHOUSE_CLIENT --max_threads 8 --max_memory_usage_for_user 1073741824 -q 'INSERT INTO test_inserts FORMAT CSV' 2>/dev/null )
|
|
||||||
}
|
|
||||||
|
|
||||||
for ((i = 1; i <= total_iterations; i++)); do
|
|
||||||
for ((j = 1; j <= parallelism; j++)); do
|
|
||||||
run_query & pids+=($!)
|
|
||||||
done
|
|
||||||
|
|
||||||
EXIT_CODE=0
|
|
||||||
new_pids=()
|
|
||||||
for pid in "${pids[@]:0:parallelism}"; do
|
|
||||||
CODE=0
|
|
||||||
wait "${pid}" || CODE=$?
|
|
||||||
run_query & new_pids+=($!)
|
|
||||||
if [[ "${CODE}" != "0" ]]; then
|
|
||||||
EXIT_CODE=1;
|
|
||||||
fi
|
|
||||||
done
|
|
||||||
for pid in "${pids[@]:parallelism}"; do
|
|
||||||
CODE=0
|
|
||||||
wait "${pid}" || CODE=$?
|
|
||||||
if [[ "${CODE}" != "0" ]]; then
|
|
||||||
EXIT_CODE=1;
|
|
||||||
fi
|
|
||||||
done
|
|
||||||
pids=("${new_pids[@]}")
|
|
||||||
|
|
||||||
if [[ $EXIT_CODE -ne 0 ]]; then
|
|
||||||
exit $EXIT_CODE
|
|
||||||
fi
|
|
||||||
done
|
|
7
tests/queries/0_stateless/02933_local_system_setting.sh
Executable file
7
tests/queries/0_stateless/02933_local_system_setting.sh
Executable file
@ -0,0 +1,7 @@
|
|||||||
|
#!/usr/bin/env bash
|
||||||
|
|
||||||
|
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||||
|
# shellcheck source=../shell_config.sh
|
||||||
|
. "$CURDIR"/../shell_config.sh
|
||||||
|
|
||||||
|
$CLICKHOUSE_LOCAL -q "select * from system.server_settings format Null;"
|
@ -0,0 +1,24 @@
|
|||||||
|
TSV
|
||||||
|
Content-Type: text/tab-separated-values; charset=UTF-8
|
||||||
|
TabSeparatedWithNamesAndTypes
|
||||||
|
Content-Type: text/tab-separated-values; charset=UTF-8
|
||||||
|
CSV
|
||||||
|
Content-Type: text/csv; charset=UTF-8; header=absent
|
||||||
|
CSVWithNames
|
||||||
|
Content-Type: text/csv; charset=UTF-8; header=present
|
||||||
|
Null
|
||||||
|
Content-Type: text/plain; charset=UTF-8
|
||||||
|
Native
|
||||||
|
Content-Type: application/octet-stream
|
||||||
|
RowBinary
|
||||||
|
Content-Type: application/octet-stream
|
||||||
|
JSONStrings
|
||||||
|
Content-Type: application/json; charset=UTF-8
|
||||||
|
JSON
|
||||||
|
Content-Type: application/json; charset=UTF-8
|
||||||
|
JSONEachRow
|
||||||
|
Content-Type: application/x-ndjson; charset=UTF-8
|
||||||
|
Values
|
||||||
|
Content-Type: text/plain; charset=UTF-8
|
||||||
|
Vertical
|
||||||
|
Content-Type: text/plain; charset=UTF-8
|
@ -0,0 +1,13 @@
|
|||||||
|
#!/usr/bin/env bash
|
||||||
|
|
||||||
|
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||||
|
# shellcheck source=../shell_config.sh
|
||||||
|
. "$CURDIR"/../shell_config.sh
|
||||||
|
|
||||||
|
for frmt in TSV TabSeparatedWithNamesAndTypes CSV CSVWithNames Null Native RowBinary JSONStrings JSON JSONEachRow Values Vertical
|
||||||
|
do
|
||||||
|
echo $frmt
|
||||||
|
url="${CLICKHOUSE_URL}/?http_headers_progress_interval_ms=1&send_progress_in_http_headers=true&query=select+sleepEachRow(0.01)from+numbers(10)+FORMAT+${frmt}"
|
||||||
|
(seq 1 200| xargs -n1 -P0 -Ixxx curl -Ss -v -o /dev/null ${url} 2>&1|grep -Eo " Content-Type:.*$")|strings|sort -u
|
||||||
|
done
|
||||||
|
|
@ -1 +0,0 @@
|
|||||||
0 0
|
|
@ -1,36 +0,0 @@
|
|||||||
#!/usr/bin/env bash
|
|
||||||
|
|
||||||
# Tags: no-parallel, no-random-settings, long
|
|
||||||
|
|
||||||
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
|
||||||
# shellcheck source=../shell_config.sh
|
|
||||||
. "$CURDIR"/../shell_config.sh
|
|
||||||
|
|
||||||
|
|
||||||
# Test assumes that the whole table is residing in the cache, but `hits_s3` has only 128Mi of cache.
|
|
||||||
# So we need to create a smaller table.
|
|
||||||
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS hits_s3_sampled"
|
|
||||||
$CLICKHOUSE_CLIENT -q "CREATE TABLE hits_s3_sampled AS test.hits_s3"
|
|
||||||
$CLICKHOUSE_CLIENT -q "INSERT INTO hits_s3_sampled SELECT * FROM test.hits_s3 SAMPLE 0.01"
|
|
||||||
$CLICKHOUSE_CLIENT -q "OPTIMIZE TABLE hits_s3_sampled FINAL"
|
|
||||||
|
|
||||||
$CLICKHOUSE_CLIENT -q "SYSTEM DROP FILESYSTEM CACHE"
|
|
||||||
|
|
||||||
# Warm up the cache
|
|
||||||
$CLICKHOUSE_CLIENT -q "SELECT * FROM hits_s3_sampled WHERE URL LIKE '%google%' ORDER BY EventTime LIMIT 10 FORMAT Null"
|
|
||||||
$CLICKHOUSE_CLIENT -q "SELECT * FROM hits_s3_sampled WHERE URL LIKE '%google%' ORDER BY EventTime LIMIT 10 FORMAT Null"
|
|
||||||
|
|
||||||
query_id=02906_read_from_cache_$RANDOM
|
|
||||||
$CLICKHOUSE_CLIENT --query_id ${query_id} -q "SELECT * FROM hits_s3_sampled WHERE URL LIKE '%google%' ORDER BY EventTime LIMIT 10 FORMAT Null"
|
|
||||||
|
|
||||||
$CLICKHOUSE_CLIENT -nq "
|
|
||||||
SYSTEM FLUSH LOGS;
|
|
||||||
|
|
||||||
-- AsynchronousReaderIgnoredBytes = 0: no seek-avoiding happened
|
|
||||||
-- CachedReadBufferReadFromSourceBytes = 0: sanity check to ensure we read only from cache
|
|
||||||
SELECT ProfileEvents['AsynchronousReaderIgnoredBytes'], ProfileEvents['CachedReadBufferReadFromSourceBytes']
|
|
||||||
FROM system.query_log
|
|
||||||
WHERE query_id = '$query_id' AND type = 'QueryFinish' AND event_date >= yesterday() AND current_database = currentDatabase()
|
|
||||||
"
|
|
||||||
|
|
||||||
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS hits_s3_sampled"
|
|
Loading…
Reference in New Issue
Block a user