Merge branch 'master' into compress_marks_and_primary_key

This commit is contained in:
zhongyuankai 2022-08-30 22:10:19 +08:00 committed by GitHub
commit 0fe79942be
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
47 changed files with 586 additions and 230 deletions

View File

@ -0,0 +1,22 @@
#define _GNU_SOURCE
#include <unistd.h>
#include <errno.h>
#include <fcntl.h>
#include "syscall.h"
int dup3(int old, int new, int flags)
{
int r;
#ifdef SYS_dup2
if (old==new) return __syscall_ret(-EINVAL);
if (flags & O_CLOEXEC) {
while ((r=__syscall(SYS_dup3, old, new, flags))==-EBUSY);
if (r!=-ENOSYS) return __syscall_ret(r);
}
while ((r=__syscall(SYS_dup2, old, new))==-EBUSY);
if (flags & O_CLOEXEC) __syscall(SYS_fcntl, new, F_SETFD, FD_CLOEXEC);
#else
while ((r=__syscall(SYS_dup3, old, new, flags))==-EBUSY);
#endif
return __syscall_ret(r);
}

View File

@ -0,0 +1,26 @@
#include <sys/inotify.h>
#include <errno.h>
#include "syscall.h"
int inotify_init()
{
return inotify_init1(0);
}
int inotify_init1(int flags)
{
int r = __syscall(SYS_inotify_init1, flags);
#ifdef SYS_inotify_init
if (r==-ENOSYS && !flags) r = __syscall(SYS_inotify_init);
#endif
return __syscall_ret(r);
}
int inotify_add_watch(int fd, const char *pathname, uint32_t mask)
{
return syscall(SYS_inotify_add_watch, fd, pathname, mask);
}
int inotify_rm_watch(int fd, int wd)
{
return syscall(SYS_inotify_rm_watch, fd, wd);
}

2
contrib/libuv vendored

@ -1 +1 @@
Subproject commit 95081e7c16c9857babe6d4e2bc1c779198ea89ae
Subproject commit 3a85b2eb3d83f369b8a8cafd329d7e9dc28f60cf

View File

@ -15,6 +15,7 @@ set(uv_sources
src/inet.c
src/random.c
src/strscpy.c
src/strtok.c
src/threadpool.c
src/timer.c
src/uv-common.c
@ -75,13 +76,13 @@ if(CMAKE_SYSTEM_NAME STREQUAL "Linux")
list(APPEND uv_defines _GNU_SOURCE _POSIX_C_SOURCE=200112)
list(APPEND uv_libraries rt)
list(APPEND uv_sources
src/unix/epoll.c
src/unix/linux-core.c
src/unix/linux-inotify.c
src/unix/linux-syscalls.c
src/unix/procfs-exepath.c
src/unix/random-getrandom.c
src/unix/random-sysctl-linux.c
src/unix/sysinfo-loadavg.c)
src/unix/random-sysctl-linux.c)
endif()
if(CMAKE_SYSTEM_NAME STREQUAL "NetBSD")
@ -111,6 +112,7 @@ if(CMAKE_SYSTEM_NAME STREQUAL "OS/390")
src/unix/pthread-fixes.c
src/unix/pthread-barrier.c
src/unix/os390.c
src/unix/os390-proctitle.c
src/unix/os390-syscalls.c)
endif()

View File

@ -4,7 +4,7 @@ sidebar_position: 50
sidebar_label: MySQL
---
# MySQL
# MySQL
Allows to connect to databases on a remote MySQL server and perform `INSERT` and `SELECT` queries to exchange data between ClickHouse and MySQL.
@ -99,7 +99,7 @@ mysql> select * from mysql_table;
Database in ClickHouse, exchanging data with the MySQL server:
``` sql
CREATE DATABASE mysql_db ENGINE = MySQL('localhost:3306', 'test', 'my_user', 'user_password')
CREATE DATABASE mysql_db ENGINE = MySQL('localhost:3306', 'test', 'my_user', 'user_password') SETTINGS read_write_timeout=10000, connect_timeout=100;
```
``` sql

View File

@ -495,7 +495,7 @@ If the s string is non-empty and does not contain the c character at
Returns the string s that was converted from the encoding in from to the encoding in to.
## Base58Encode(plaintext), Base58Decode(encoded_text)
## base58Encode(plaintext), base58Decode(encoded_text)
Accepts a String and encodes/decodes it using [Base58](https://tools.ietf.org/id/draft-msporny-base58-01.html) encoding scheme using "Bitcoin" alphabet.
@ -523,7 +523,7 @@ Query:
``` sql
SELECT base58Encode('Encoded');
SELECT base58Encode('3dc8KtHrwM');
SELECT base58Decode('3dc8KtHrwM');
```
Result:

View File

@ -16,7 +16,7 @@ sidebar_label: "Функции для работы со строками"
empty(x)
```
Строка считается непустой, если содержит хотя бы один байт, пусть даже это пробел или нулевой байт.
Строка считается непустой, если содержит хотя бы один байт, пусть даже это пробел или нулевой байт.
Функция также поддерживает работу с типами [Array](array-functions.md#function-empty) и [UUID](uuid-functions.md#empty).
@ -56,7 +56,7 @@ SELECT empty('text');
notEmpty(x)
```
Строка считается непустой, если содержит хотя бы один байт, пусть даже это пробел или нулевой байт.
Строка считается непустой, если содержит хотя бы один байт, пусть даже это пробел или нулевой байт.
Функция также поддерживает работу с типами [Array](array-functions.md#function-notempty) и [UUID](uuid-functions.md#notempty).
@ -491,21 +491,21 @@ SELECT concat(key1, key2), sum(value) FROM key_val GROUP BY (key1, key2);
Возвращает сконвертированную из кодировки from в кодировку to строку s.
## Base58Encode(plaintext), Base58Decode(encoded_text) {#base58}
## base58Encode(plaintext), base58Decode(encoded_text) {#base58}
Принимает на вход строку или колонку строк и кодирует/раскодирует их с помощью схемы кодирования [Base58](https://tools.ietf.org/id/draft-msporny-base58-01.html) с использованием стандартного алфавита Bitcoin.
**Синтаксис**
```sql
encodeBase58(decoded)
decodeBase58(encoded)
base58Encode(decoded)
base58Decode(encoded)
```
**Аргументы**
- `decoded` — Колонка или строка типа [String](../../sql-reference/data-types/string.md).
- `encoded` — Колонка или строка типа [String](../../sql-reference/data-types/string.md). Если входная строка не является корректным кодом для какой-либо другой строки, возникнет исключение `1001`.
- `encoded` — Колонка или строка типа [String](../../sql-reference/data-types/string.md). Если входная строка не является корректным кодом для какой-либо другой строки, возникнет исключение.
**Возвращаемое значение**
@ -518,18 +518,18 @@ decodeBase58(encoded)
Запрос:
``` sql
SELECT encodeBase58('encode');
SELECT decodeBase58('izCFiDUY');
SELECT base58Encode('Encoded');
SELECT base58Decode('3dc8KtHrwM');
```
Результат:
```text
┌─encodeBase58('encode', 'flickr')─┐
SvyTHb1D
└──────────────────────────────────
┌─decodeBase58('izCFiDUY', 'ripple')─┐
decode
└────────────────────────────────────
┌─base58Encode('Encoded')─┐
3dc8KtHrwM
└─────────────────────────┘
┌─base58Decode('3dc8KtHrwM')─┐
Encoded
└────────────────────────────┘
```
## base64Encode(s) {#base64encode}

View File

@ -79,7 +79,7 @@ public:
/// No user, probably the user has been dropped while it was in the cache.
cache.remove(params);
}
auto res = std::shared_ptr<ContextAccess>(new ContextAccess(access_control, params));
auto res = ContextAccess::make(access_control, params);
res->initialize();
cache.add(params, res);
return res;

View File

@ -110,7 +110,7 @@ namespace
}
/// Returns the host name by its address.
Strings getHostsByAddress(const IPAddress & address)
std::unordered_set<String> getHostsByAddress(const IPAddress & address)
{
auto hosts = DNSResolver::instance().reverseResolve(address);
@ -526,7 +526,7 @@ bool AllowedClientHosts::contains(const IPAddress & client_address) const
return true;
/// Check `name_regexps`.
std::optional<Strings> resolved_hosts;
std::optional<std::unordered_set<String>> resolved_hosts;
auto check_name_regexp = [&](const String & name_regexp_)
{
try

View File

@ -410,7 +410,7 @@ std::shared_ptr<const ContextAccess> ContextAccess::getFullAccess()
{
static const std::shared_ptr<const ContextAccess> res = []
{
auto full_access = std::shared_ptr<ContextAccess>(new ContextAccess);
auto full_access = ContextAccess::make();
full_access->is_full_access = true;
full_access->access = std::make_shared<AccessRights>(AccessRights::getFullAccess());
full_access->access_with_implicit = full_access->access;

View File

@ -69,6 +69,9 @@ public:
using Params = ContextAccessParams;
const Params & getParams() const { return params; }
ContextAccess() { } /// NOLINT
ContextAccess(const AccessControl & access_control_, const Params & params_);
/// Returns the current user. Throws if user is nullptr.
UserPtr getUser() const;
/// Same as above, but can return nullptr.
@ -163,12 +166,16 @@ public:
/// without any limitations. This is used for the global context.
static std::shared_ptr<const ContextAccess> getFullAccess();
template <typename... Args>
static std::shared_ptr<ContextAccess> make(Args &&... args)
{
return std::make_shared<ContextAccess>(std::forward<Args>(args)...);
}
~ContextAccess();
private:
friend class AccessControl;
ContextAccess() {} /// NOLINT
ContextAccess(const AccessControl & access_control_, const Params & params_);
void initialize();
void setUser(const UserPtr & user_) const;

View File

@ -15,13 +15,23 @@ namespace DB
static void callback(void * arg, int status, int, struct hostent * host)
{
auto * ptr_records = reinterpret_cast<std::vector<std::string>*>(arg);
auto * ptr_records = reinterpret_cast<std::unordered_set<std::string>*>(arg);
if (status == ARES_SUCCESS && host->h_aliases)
{
/*
* In some cases (e.g /etc/hosts), hostent::h_name is filled and hostent::h_aliases is empty.
* Thus, we can't rely solely on hostent::h_aliases. More info on:
* https://github.com/ClickHouse/ClickHouse/issues/40595#issuecomment-1230526931
* */
if (auto * ptr_record = host->h_name)
{
ptr_records->insert(ptr_record);
}
int i = 0;
while (auto * ptr_record = host->h_aliases[i])
{
ptr_records->emplace_back(ptr_record);
ptr_records->insert(ptr_record);
i++;
}
}
@ -58,9 +68,9 @@ namespace DB
* */
}
std::vector<std::string> CaresPTRResolver::resolve(const std::string & ip)
std::unordered_set<std::string> CaresPTRResolver::resolve(const std::string & ip)
{
std::vector<std::string> ptr_records;
std::unordered_set<std::string> ptr_records;
resolve(ip, ptr_records);
wait();
@ -68,9 +78,9 @@ namespace DB
return ptr_records;
}
std::vector<std::string> CaresPTRResolver::resolve_v6(const std::string & ip)
std::unordered_set<std::string> CaresPTRResolver::resolve_v6(const std::string & ip)
{
std::vector<std::string> ptr_records;
std::unordered_set<std::string> ptr_records;
resolve_v6(ip, ptr_records);
wait();
@ -78,7 +88,7 @@ namespace DB
return ptr_records;
}
void CaresPTRResolver::resolve(const std::string & ip, std::vector<std::string> & response)
void CaresPTRResolver::resolve(const std::string & ip, std::unordered_set<std::string> & response)
{
in_addr addr;
@ -87,7 +97,7 @@ namespace DB
ares_gethostbyaddr(channel, reinterpret_cast<const void*>(&addr), sizeof(addr), AF_INET, callback, &response);
}
void CaresPTRResolver::resolve_v6(const std::string & ip, std::vector<std::string> & response)
void CaresPTRResolver::resolve_v6(const std::string & ip, std::unordered_set<std::string> & response)
{
in6_addr addr;
inet_pton(AF_INET6, ip.c_str(), &addr);

View File

@ -25,16 +25,16 @@ namespace DB
explicit CaresPTRResolver(provider_token);
~CaresPTRResolver() override;
std::vector<std::string> resolve(const std::string & ip) override;
std::unordered_set<std::string> resolve(const std::string & ip) override;
std::vector<std::string> resolve_v6(const std::string & ip) override;
std::unordered_set<std::string> resolve_v6(const std::string & ip) override;
private:
void wait();
void resolve(const std::string & ip, std::vector<std::string> & response);
void resolve(const std::string & ip, std::unordered_set<std::string> & response);
void resolve_v6(const std::string & ip, std::vector<std::string> & response);
void resolve_v6(const std::string & ip, std::unordered_set<std::string> & response);
ares_channel channel;
};

View File

@ -1,7 +1,7 @@
#pragma once
#include <string>
#include <vector>
#include <unordered_set>
namespace DB
{
@ -10,9 +10,9 @@ namespace DB
virtual ~DNSPTRResolver() = default;
virtual std::vector<std::string> resolve(const std::string & ip) = 0;
virtual std::unordered_set<std::string> resolve(const std::string & ip) = 0;
virtual std::vector<std::string> resolve_v6(const std::string & ip) = 0;
virtual std::unordered_set<std::string> resolve_v6(const std::string & ip) = 0;
};
}

View File

@ -136,7 +136,7 @@ static DNSResolver::IPAddresses resolveIPAddressImpl(const std::string & host)
return addresses;
}
static Strings reverseResolveImpl(const Poco::Net::IPAddress & address)
static std::unordered_set<String> reverseResolveImpl(const Poco::Net::IPAddress & address)
{
auto ptr_resolver = DB::DNSPTRResolverProvider::get();
@ -234,7 +234,7 @@ std::vector<Poco::Net::SocketAddress> DNSResolver::resolveAddressList(const std:
return addresses;
}
Strings DNSResolver::reverseResolve(const Poco::Net::IPAddress & address)
std::unordered_set<String> DNSResolver::reverseResolve(const Poco::Net::IPAddress & address)
{
if (impl->disable_cache)
return reverseResolveImpl(address);

View File

@ -37,7 +37,7 @@ public:
std::vector<Poco::Net::SocketAddress> resolveAddressList(const std::string & host, UInt16 port);
/// Accepts host IP and resolves its host names
Strings reverseResolve(const Poco::Net::IPAddress & address);
std::unordered_set<String> reverseResolve(const Poco::Net::IPAddress & address);
/// Get this server host name
String getHostName();

View File

@ -71,10 +71,15 @@ bool FileCache::isReadOnly()
return !isQueryInitialized();
}
void FileCache::assertInitialized() const
void FileCache::assertInitialized(std::lock_guard<std::mutex> & /* cache_lock */) const
{
if (!is_initialized)
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Cache not initialized");
{
if (initialization_exception)
std::rethrow_exception(initialization_exception);
else
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Cache not initialized");
}
}
void FileCache::initialize()
@ -90,28 +95,43 @@ void FileCache::initialize()
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
initialization_exception = std::current_exception();
throw;
}
}
else
{
fs::create_directories(cache_base_path);
}
is_initialized = true;
}
}
void FileCache::useCell(
const FileSegmentCell & cell, FileSegments & result, std::lock_guard<std::mutex> & cache_lock) const
const FileSegmentCell & cell, FileSegments & result, std::lock_guard<std::mutex> & cache_lock)
{
auto file_segment = cell.file_segment;
if (file_segment->isDownloaded()
&& fs::file_size(getPathInLocalCache(file_segment->key(), file_segment->offset(), file_segment->isPersistent())) == 0)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot have zero size downloaded file segments. {}",
file_segment->getInfoForLog());
if (file_segment->isDownloaded())
{
fs::path path = file_segment->getPathInLocalCache();
if (!fs::exists(path))
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"File path does not exist, but file has DOWNLOADED state. {}",
file_segment->getInfoForLog());
}
if (fs::file_size(path) == 0)
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot have zero size downloaded file segments. {}",
file_segment->getInfoForLog());
}
}
result.push_back(cell.file_segment);
@ -222,7 +242,12 @@ FileSegments FileCache::getImpl(
}
FileSegments FileCache::splitRangeIntoCells(
const Key & key, size_t offset, size_t size, FileSegment::State state, bool is_persistent, std::lock_guard<std::mutex> & cache_lock)
const Key & key,
size_t offset,
size_t size,
FileSegment::State state,
bool is_persistent,
std::lock_guard<std::mutex> & cache_lock)
{
assert(size > 0);
@ -346,16 +371,16 @@ void FileCache::fillHolesWithEmptyFileSegments(
FileSegmentsHolder FileCache::getOrSet(const Key & key, size_t offset, size_t size, bool is_persistent)
{
assertInitialized();
FileSegment::Range range(offset, offset + size - 1);
std::lock_guard cache_lock(mutex);
assertInitialized(cache_lock);
#ifndef NDEBUG
assertCacheCorrectness(key, cache_lock);
#endif
FileSegment::Range range(offset, offset + size - 1);
/// Get all segments which intersect with the given range.
auto file_segments = getImpl(key, range, cache_lock);
@ -374,16 +399,16 @@ FileSegmentsHolder FileCache::getOrSet(const Key & key, size_t offset, size_t si
FileSegmentsHolder FileCache::get(const Key & key, size_t offset, size_t size)
{
assertInitialized();
FileSegment::Range range(offset, offset + size - 1);
std::lock_guard cache_lock(mutex);
assertInitialized(cache_lock);
#ifndef NDEBUG
assertCacheCorrectness(key, cache_lock);
#endif
FileSegment::Range range(offset, offset + size - 1);
/// Get all segments which intersect with the given range.
auto file_segments = getImpl(key, range, cache_lock);
@ -417,7 +442,7 @@ FileCache::FileSegmentCell * FileCache::addCell(
if (files[key].contains(offset))
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cache already exists for key: `{}`, offset: {}, size: {}.\nCurrent cache structure: {}",
"Cache cell already exists for key: `{}`, offset: {}, size: {}.\nCurrent cache structure: {}",
key.toString(), offset, size, dumpStructureUnlocked(key, cache_lock));
auto skip_or_download = [&]() -> FileSegmentPtr
@ -605,9 +630,7 @@ bool FileCache::tryReserve(const Key & key, size_t offset, size_t size, std::loc
auto remove_file_segment = [&](FileSegmentPtr file_segment, size_t file_segment_size)
{
query_context->remove(file_segment->key(), file_segment->offset(), file_segment_size, cache_lock);
std::lock_guard segment_lock(file_segment->mutex);
remove(file_segment->key(), file_segment->offset(), cache_lock, segment_lock);
remove(file_segment, cache_lock);
};
assert(trash.empty());
@ -724,19 +747,13 @@ bool FileCache::tryReserveForMainList(
}
}
auto remove_file_segment = [&](FileSegmentPtr file_segment)
{
std::lock_guard segment_lock(file_segment->mutex);
remove(file_segment->key(), file_segment->offset(), cache_lock, segment_lock);
};
/// This case is very unlikely, can happen in case of exception from
/// file_segment->complete(), which would be a logical error.
assert(trash.empty());
for (auto & cell : trash)
{
if (auto file_segment = cell->file_segment)
remove_file_segment(file_segment);
remove(file_segment, cache_lock);
}
if (is_overflow())
@ -758,7 +775,7 @@ bool FileCache::tryReserveForMainList(
for (auto & cell : to_evict)
{
if (auto file_segment = cell->file_segment)
remove_file_segment(file_segment);
remove(file_segment, cache_lock);
}
if (main_priority->getCacheSize(cache_lock) > (1ull << 63))
@ -772,10 +789,10 @@ bool FileCache::tryReserveForMainList(
void FileCache::removeIfExists(const Key & key)
{
assertInitialized();
std::lock_guard cache_lock(mutex);
assertInitialized(cache_lock);
auto it = files.find(key);
if (it == files.end())
return;
@ -869,27 +886,36 @@ void FileCache::removeIfReleasable()
#endif
}
void FileCache::remove(FileSegmentPtr file_segment, std::lock_guard<std::mutex> & cache_lock)
{
std::lock_guard segment_lock(file_segment->mutex);
remove(file_segment->key(), file_segment->offset(), cache_lock, segment_lock);
}
void FileCache::remove(
Key key, size_t offset,
std::lock_guard<std::mutex> & cache_lock, std::lock_guard<std::mutex> & /* segment_lock */)
{
LOG_DEBUG(log, "Remove from cache. Key: {}, offset: {}", key.toString(), offset);
auto * cell = getCell(key, offset, cache_lock);
if (!cell)
throw Exception(ErrorCodes::LOGICAL_ERROR, "No cache cell for key: {}, offset: {}", key.toString(), offset);
String cache_file_path;
bool is_persistent_file_segment = cell->file_segment->isPersistent();
if (cell->queue_iterator)
{
cell->queue_iterator->removeAndGetNext(cache_lock);
auto * cell = getCell(key, offset, cache_lock);
if (!cell)
throw Exception(ErrorCodes::LOGICAL_ERROR, "No cache cell for key: {}, offset: {}", key.toString(), offset);
if (cell->queue_iterator)
{
cell->queue_iterator->removeAndGetNext(cache_lock);
}
cache_file_path = cell->file_segment->getPathInLocalCache();
}
auto & offsets = files[key];
offsets.erase(offset);
auto cache_file_path = getPathInLocalCache(key, offset, is_persistent_file_segment);
if (fs::exists(cache_file_path))
{
try
@ -908,9 +934,10 @@ void FileCache::remove(
}
catch (...)
{
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR,
"Removal of cached file failed. Key: {}, offset: {}, path: {}, error: {}",
key.toString(), offset, cache_file_path, getCurrentExceptionMessage(false));
throw Exception(
ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR,
"Removal of cached file failed. Key: {}, offset: {}, path: {}, error: {}",
key.toString(), offset, cache_file_path, getCurrentExceptionMessage(false));
}
}
}
@ -1139,9 +1166,10 @@ FileCache::FileSegmentCell::FileSegmentCell(
break;
}
default:
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR,
"Can create cell with either EMPTY, DOWNLOADED, DOWNLOADING state, got: {}",
FileSegment::stateToString(file_segment->download_state));
throw Exception(
ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR,
"Can create cell with either EMPTY, DOWNLOADED, DOWNLOADING state, got: {}",
FileSegment::stateToString(file_segment->download_state));
}
}

View File

@ -140,7 +140,9 @@ private:
bool enable_filesystem_query_cache_limit;
Poco::Logger * log;
bool is_initialized = false;
std::exception_ptr initialization_exception;
mutable std::mutex mutex;
@ -152,6 +154,10 @@ private:
std::lock_guard<std::mutex> & cache_lock,
std::lock_guard<std::mutex> & segment_lock);
void remove(
FileSegmentPtr file_segment,
std::lock_guard<std::mutex> & cache_lock);
bool isLastFileSegmentHolder(
const Key & key,
size_t offset,
@ -164,7 +170,7 @@ private:
std::lock_guard<std::mutex> & cache_lock,
std::lock_guard<std::mutex> & segment_lock);
void assertInitialized() const;
void assertInitialized(std::lock_guard<std::mutex> & cache_lock) const;
struct FileSegmentCell : private boost::noncopyable
{
@ -220,7 +226,7 @@ private:
bool is_persistent,
std::lock_guard<std::mutex> & cache_lock);
void useCell(const FileSegmentCell & cell, FileSegments & result, std::lock_guard<std::mutex> & cache_lock) const;
static void useCell(const FileSegmentCell & cell, FileSegments & result, std::lock_guard<std::mutex> & cache_lock);
bool tryReserveForMainList(
const Key & key,

View File

@ -78,19 +78,19 @@ FileSegment::State FileSegment::state() const
size_t FileSegment::getDownloadOffset() const
{
std::lock_guard segment_lock(mutex);
return range().left + getDownloadedSize(segment_lock);
return range().left + getDownloadedSizeUnlocked(segment_lock);
}
size_t FileSegment::getDownloadedSize() const
{
std::lock_guard segment_lock(mutex);
return getDownloadedSize(segment_lock);
return getDownloadedSizeUnlocked(segment_lock);
}
size_t FileSegment::getRemainingSizeToDownload() const
{
std::lock_guard segment_lock(mutex);
return range().size() - downloaded_size;
return range().size() - getDownloadedSizeUnlocked(segment_lock);
}
bool FileSegment::isDetached() const
@ -99,7 +99,7 @@ bool FileSegment::isDetached() const
return is_detached;
}
size_t FileSegment::getDownloadedSize(std::lock_guard<std::mutex> & /* segment_lock */) const
size_t FileSegment::getDownloadedSizeUnlocked(std::lock_guard<std::mutex> & /* segment_lock */) const
{
if (download_state == State::DOWNLOADED)
return downloaded_size;
@ -159,7 +159,7 @@ void FileSegment::resetDownloader()
void FileSegment::resetDownloaderImpl(std::lock_guard<std::mutex> & segment_lock)
{
if (downloaded_size == range().size())
if (getDownloadedSizeUnlocked(segment_lock) == range().size())
setDownloaded(segment_lock);
else
download_state = State::PARTIALLY_DOWNLOADED;
@ -241,14 +241,16 @@ void FileSegment::write(const char * from, size_t size, size_t offset_)
"Not enough space is reserved. Available: {}, expected: {}", availableSize(), size);
if (!isDownloader())
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Only downloader can do the downloading. (CallerId: {}, DownloaderId: {})",
getCallerId(), downloader_id);
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Only downloader can do the downloading. (CallerId: {}, DownloaderId: {})",
getCallerId(), downloader_id);
if (downloaded_size == range().size())
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR,
"Attempt to write {} bytes to offset: {}, but current file segment is already fully downloaded",
size, offset_);
if (getDownloadedSize() == range().size())
throw Exception(
ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR,
"Attempt to write {} bytes to offset: {}, but current file segment is already fully downloaded",
size, offset_);
auto download_offset = range().left + downloaded_size;
if (offset_ != download_offset)
@ -331,9 +333,9 @@ FileSegment::State FileSegment::wait()
return download_state;
}
bool FileSegment::reserve(size_t size)
bool FileSegment::reserve(size_t size_to_reserve)
{
if (!size)
if (!size_to_reserve)
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Zero space reservation is not allowed");
{
@ -350,12 +352,16 @@ bool FileSegment::reserve(size_t size)
caller_id, downloader_id);
}
if (downloaded_size + size > range().size())
throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR,
"Attempt to reserve space too much space ({}) for file segment with range: {} (downloaded size: {})",
size, range().toString(), downloaded_size);
size_t current_downloaded_size = getDownloadedSizeUnlocked(segment_lock);
if (current_downloaded_size + size_to_reserve > range().size())
{
throw Exception(
ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR,
"Attempt to reserve space too much space: {} ({})",
size_to_reserve, getInfoForLogImpl(segment_lock));
}
assert(reserved_size >= downloaded_size);
assert(reserved_size >= current_downloaded_size);
}
/**
@ -363,29 +369,45 @@ bool FileSegment::reserve(size_t size)
* in case previous downloader did not fully download current file_segment
* and the caller is going to continue;
*/
size_t free_space = reserved_size - downloaded_size;
size_t size_to_reserve = size - free_space;
std::lock_guard cache_lock(cache->mutex);
size_t current_downloaded_size = getDownloadedSize();
assert(reserved_size >= current_downloaded_size);
size_t already_reserved_size = reserved_size - current_downloaded_size;
bool reserved = cache->tryReserve(key(), offset(), size_to_reserve, cache_lock);
if (reserved)
bool reserved = already_reserved_size >= size_to_reserve;
if (!reserved)
{
std::lock_guard segment_lock(mutex);
reserved_size += size;
std::lock_guard cache_lock(cache->mutex);
size_to_reserve = size_to_reserve - already_reserved_size;
reserved = cache->tryReserve(key(), offset(), size_to_reserve, cache_lock);
if (reserved)
{
std::lock_guard segment_lock(mutex);
reserved_size += size_to_reserve;
}
}
return reserved;
}
void FileSegment::setDownloaded(std::lock_guard<std::mutex> & /* segment_lock */)
bool FileSegment::isDownloaded() const
{
std::lock_guard segment_lock(mutex);
return isDownloadedUnlocked(segment_lock);
}
bool FileSegment::isDownloadedUnlocked(std::lock_guard<std::mutex> & /* segment_lock */) const
{
return is_downloaded;
}
void FileSegment::setDownloaded([[maybe_unused]] std::lock_guard<std::mutex> & segment_lock)
{
if (is_downloaded)
return;
download_state = State::DOWNLOADED;
is_downloaded = true;
downloader_id.clear();
if (cache_writer)
@ -394,6 +416,12 @@ void FileSegment::setDownloaded(std::lock_guard<std::mutex> & /* segment_lock */
cache_writer.reset();
remote_file_reader.reset();
}
download_state = State::DOWNLOADED;
is_downloaded = true;
assert(getDownloadedSizeUnlocked(segment_lock) > 0);
assert(std::filesystem::file_size(getPathInLocalCache()) > 0);
}
void FileSegment::setDownloadFailed(std::lock_guard<std::mutex> & /* segment_lock */)
@ -426,7 +454,7 @@ void FileSegment::completeBatchAndResetDownloader()
resetDownloaderImpl(segment_lock);
LOG_TEST(log, "Complete batch. Current downloaded size: {}", downloaded_size);
LOG_TEST(log, "Complete batch. Current downloaded size: {}", getDownloadedSizeUnlocked(segment_lock));
cv.notify_all();
}
@ -475,7 +503,7 @@ void FileSegment::completeBasedOnCurrentState(std::lock_guard<std::mutex> & cach
bool is_downloader = isDownloaderImpl(segment_lock);
bool is_last_holder = cache->isLastFileSegmentHolder(key(), offset(), cache_lock, segment_lock);
bool can_update_segment_state = is_downloader || is_last_holder;
size_t current_downloaded_size = getDownloadedSize(segment_lock);
size_t current_downloaded_size = getDownloadedSizeUnlocked(segment_lock);
SCOPE_EXIT({
if (is_downloader)
@ -494,13 +522,6 @@ void FileSegment::completeBasedOnCurrentState(std::lock_guard<std::mutex> & cach
download_state = State::PARTIALLY_DOWNLOADED;
resetDownloaderImpl(segment_lock);
if (cache_writer)
{
cache_writer->finalize();
cache_writer.reset();
remote_file_reader.reset();
}
}
switch (download_state)
@ -514,8 +535,8 @@ void FileSegment::completeBasedOnCurrentState(std::lock_guard<std::mutex> & cach
}
case State::DOWNLOADED:
{
assert(downloaded_size == range().size());
assert(is_downloaded);
assert(getDownloadedSizeUnlocked(segment_lock) == range().size());
assert(isDownloadedUnlocked(segment_lock));
break;
}
case State::DOWNLOADING:
@ -577,7 +598,7 @@ String FileSegment::getInfoForLogImpl(std::lock_guard<std::mutex> & segment_lock
info << "File segment: " << range().toString() << ", ";
info << "key: " << key().toString() << ", ";
info << "state: " << download_state << ", ";
info << "downloaded size: " << getDownloadedSize(segment_lock) << ", ";
info << "downloaded size: " << getDownloadedSizeUnlocked(segment_lock) << ", ";
info << "reserved size: " << reserved_size << ", ";
info << "downloader id: " << (downloader_id.empty() ? "None" : downloader_id) << ", ";
info << "caller id: " << getCallerId() << ", ";
@ -673,7 +694,7 @@ FileSegmentPtr FileSegment::getSnapshot(const FileSegmentPtr & file_segment, std
snapshot->hits_count = file_segment->getHitsCount();
snapshot->ref_count = file_segment.use_count();
snapshot->downloaded_size = file_segment->getDownloadedSize(segment_lock);
snapshot->downloaded_size = file_segment->getDownloadedSizeUnlocked(segment_lock);
snapshot->download_state = file_segment->download_state;
snapshot->is_persistent = file_segment->isPersistent();
@ -818,7 +839,8 @@ void FileSegmentRangeWriter::completeFileSegment(FileSegment & file_segment)
if (file_segment.isDetached())
return;
if (file_segment.getDownloadedSize() > 0)
size_t current_downloaded_size = file_segment.getDownloadedSize();
if (current_downloaded_size > 0)
{
file_segment.getOrSetDownloader();

View File

@ -130,7 +130,7 @@ public:
bool isDownloader() const;
bool isDownloaded() const { return is_downloaded.load(); }
bool isDownloaded() const;
static String getCallerId();
@ -171,7 +171,7 @@ public:
private:
size_t availableSize() const { return reserved_size - downloaded_size; }
size_t getDownloadedSize(std::lock_guard<std::mutex> & segment_lock) const;
size_t getDownloadedSizeUnlocked(std::lock_guard<std::mutex> & segment_lock) const;
String getInfoForLogImpl(std::lock_guard<std::mutex> & segment_lock) const;
void assertCorrectnessImpl(std::lock_guard<std::mutex> & segment_lock) const;
bool hasFinalizedState() const;
@ -187,6 +187,8 @@ private:
void setDownloadFailed(std::lock_guard<std::mutex> & segment_lock);
bool isDownloaderImpl(std::lock_guard<std::mutex> & segment_lock) const;
bool isDownloadedUnlocked(std::lock_guard<std::mutex> & segment_lock) const;
void wrapWithCacheInfo(Exception & e, const String & message, std::lock_guard<std::mutex> & segment_lock) const;
bool lastFileSegmentHolder() const;
@ -236,7 +238,8 @@ private:
/// In general case, all file segments are owned by cache.
bool is_detached = false;
std::atomic<bool> is_downloaded{false};
bool is_downloaded{false};
std::atomic<size_t> hits_count = 0; /// cache hits.
std::atomic<size_t> ref_count = 0; /// Used for getting snapshot state

View File

@ -14,10 +14,10 @@ using namespace std::chrono_literals;
constexpr std::chrono::microseconds ZERO_MICROSEC = 0us;
OvercommitTracker::OvercommitTracker(std::mutex & global_mutex_)
OvercommitTracker::OvercommitTracker(DB::ProcessList * process_list_)
: picked_tracker(nullptr)
, process_list(process_list_)
, cancellation_state(QueryCancellationState::NONE)
, global_mutex(global_mutex_)
, freed_memory(0)
, required_memory(0)
, next_id(0)
@ -33,11 +33,11 @@ OvercommitResult OvercommitTracker::needToStopQuery(MemoryTracker * tracker, Int
return OvercommitResult::NONE;
// NOTE: Do not change the order of locks
//
// global_mutex must be acquired before overcommit_m, because
// global mutex must be acquired before overcommit_m, because
// method OvercommitTracker::onQueryStop(MemoryTracker *) is
// always called with already acquired global_mutex in
// always called with already acquired global mutex in
// ProcessListEntry::~ProcessListEntry().
std::unique_lock<std::mutex> global_lock(global_mutex);
auto global_lock = process_list->unsafeLock();
std::unique_lock<std::mutex> lk(overcommit_m);
size_t id = next_id++;
@ -137,8 +137,8 @@ void OvercommitTracker::releaseThreads()
cv.notify_all();
}
UserOvercommitTracker::UserOvercommitTracker(DB::ProcessList * process_list, DB::ProcessListForUser * user_process_list_)
: OvercommitTracker(process_list->mutex)
UserOvercommitTracker::UserOvercommitTracker(DB::ProcessList * process_list_, DB::ProcessListForUser * user_process_list_)
: OvercommitTracker(process_list_)
, user_process_list(user_process_list_)
{}
@ -169,8 +169,7 @@ void UserOvercommitTracker::pickQueryToExcludeImpl()
}
GlobalOvercommitTracker::GlobalOvercommitTracker(DB::ProcessList * process_list_)
: OvercommitTracker(process_list_->mutex)
, process_list(process_list_)
: OvercommitTracker(process_list_)
{}
void GlobalOvercommitTracker::pickQueryToExcludeImpl()

View File

@ -36,6 +36,12 @@ struct OvercommitRatio
class MemoryTracker;
namespace DB
{
class ProcessList;
struct ProcessListForUser;
}
enum class OvercommitResult
{
NONE,
@ -71,7 +77,7 @@ struct OvercommitTracker : boost::noncopyable
virtual ~OvercommitTracker() = default;
protected:
explicit OvercommitTracker(std::mutex & global_mutex_);
explicit OvercommitTracker(DB::ProcessList * process_list_);
virtual void pickQueryToExcludeImpl() = 0;
@ -86,6 +92,12 @@ protected:
// overcommit tracker is in SELECTED state.
MemoryTracker * picked_tracker;
// Global mutex stored in ProcessList is used to synchronize
// insertion and deletion of queries.
// OvercommitTracker::pickQueryToExcludeImpl() implementations
// require this mutex to be locked, because they read list (or sublist)
// of queries.
DB::ProcessList * process_list;
private:
void pickQueryToExclude()
@ -113,12 +125,6 @@ private:
QueryCancellationState cancellation_state;
// Global mutex which is used in ProcessList to synchronize
// insertion and deletion of queries.
// OvercommitTracker::pickQueryToExcludeImpl() implementations
// require this mutex to be locked, because they read list (or sublist)
// of queries.
std::mutex & global_mutex;
Int64 freed_memory;
Int64 required_memory;
@ -128,15 +134,9 @@ private:
bool allow_release;
};
namespace DB
{
class ProcessList;
struct ProcessListForUser;
}
struct UserOvercommitTracker : OvercommitTracker
{
explicit UserOvercommitTracker(DB::ProcessList * process_list, DB::ProcessListForUser * user_process_list_);
explicit UserOvercommitTracker(DB::ProcessList * process_list_, DB::ProcessListForUser * user_process_list_);
~UserOvercommitTracker() override = default;
@ -155,9 +155,6 @@ struct GlobalOvercommitTracker : OvercommitTracker
protected:
void pickQueryToExcludeImpl() override;
private:
DB::ProcessList * process_list;
};
// This class is used to disallow tracking during logging to avoid deadlocks.

View File

@ -9,6 +9,7 @@
#include <mysqlxx/Pool.h>
#include <base/sleep.h>
#include <Poco/Util/LayeredConfiguration.h>
#include <Common/logger_useful.h>
#include <ctime>
@ -260,7 +261,10 @@ void Pool::Entry::forceConnected() const
else
sleepForSeconds(MYSQLXX_POOL_SLEEP_ON_CONNECT_FAIL);
pool->logger.debug("Entry: Reconnecting to MySQL server %s", pool->description);
pool->logger.debug(
"Creating a new MySQL connection to %s with settings: connect_timeout=%u, read_write_timeout=%u",
pool->description, pool->connect_timeout, pool->rw_timeout);
data->conn.connect(
pool->db.c_str(),
pool->server.c_str(),
@ -325,6 +329,10 @@ Pool::Connection * Pool::allocConnection(bool dont_throw_if_failed_first_time)
{
logger.debug("Connecting to %s", description);
logger.debug(
"Creating a new MySQL connection to %s with settings: connect_timeout=%u, read_write_timeout=%u",
description, connect_timeout, rw_timeout);
conn_ptr->conn.connect(
db.c_str(),
server.c_str(),

View File

@ -168,6 +168,7 @@ PoolWithFailover::Entry PoolWithFailover::get()
}
app.logger().warning("Connection to " + pool->getDescription() + " failed: " + e.displayText());
replica_name_to_error_detail.insert_or_assign(pool->getDescription(), ErrorDetail{e.code(), e.displayText()});
continue;
@ -177,7 +178,10 @@ PoolWithFailover::Entry PoolWithFailover::get()
}
}
app.logger().error("Connection to all replicas failed " + std::to_string(try_no + 1) + " times");
if (replicas_by_priority.size() > 1)
app.logger().error("Connection to all mysql replicas failed " + std::to_string(try_no + 1) + " times");
else
app.logger().error("Connection to mysql failed " + std::to_string(try_no + 1) + " times");
}
if (full_pool)
@ -187,7 +191,11 @@ PoolWithFailover::Entry PoolWithFailover::get()
}
DB::WriteBufferFromOwnString message;
message << "Connections to all replicas failed: ";
if (replicas_by_priority.size() > 1)
message << "Connections to all mysql replicas failed: ";
else
message << "Connections to mysql failed: ";
for (auto it = replicas_by_priority.begin(); it != replicas_by_priority.end(); ++it)
{
for (auto jt = it->second.begin(); jt != it->second.end(); ++jt)

View File

@ -169,10 +169,24 @@ public:
unsigned max_connections_ = MYSQLXX_POOL_DEFAULT_MAX_CONNECTIONS,
unsigned enable_local_infile_ = MYSQLXX_DEFAULT_ENABLE_LOCAL_INFILE,
bool opt_reconnect_ = MYSQLXX_DEFAULT_MYSQL_OPT_RECONNECT)
: logger(Poco::Logger::get("mysqlxx::Pool")), default_connections(default_connections_),
max_connections(max_connections_), db(db_), server(server_), user(user_), password(password_), port(port_), socket(socket_),
connect_timeout(connect_timeout_), rw_timeout(rw_timeout_), enable_local_infile(enable_local_infile_),
opt_reconnect(opt_reconnect_) {}
: logger(Poco::Logger::get("mysqlxx::Pool"))
, default_connections(default_connections_)
, max_connections(max_connections_)
, db(db_)
, server(server_)
, user(user_)
, password(password_)
, port(port_)
, socket(socket_)
, connect_timeout(connect_timeout_)
, rw_timeout(rw_timeout_)
, enable_local_infile(enable_local_infile_)
, opt_reconnect(opt_reconnect_)
{
logger.debug(
"Created MySQL Pool with settings: connect_timeout=%u, read_write_timeout=%u, default_connections_number=%u, max_connections_number=%u",
connect_timeout, rw_timeout, default_connections, max_connections);
}
Pool(const Pool & other)
: logger(other.logger), default_connections{other.default_connections},

View File

@ -183,15 +183,15 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
StorageMySQLConfiguration configuration;
ASTs & arguments = engine->arguments->children;
MySQLSettings mysql_settings;
auto mysql_settings = std::make_unique<ConnectionMySQLSettings>();
if (auto named_collection = getExternalDataSourceConfiguration(arguments, context, true, true, mysql_settings))
if (auto named_collection = getExternalDataSourceConfiguration(arguments, context, true, true, *mysql_settings))
{
auto [common_configuration, storage_specific_args, settings_changes] = named_collection.value();
configuration.set(common_configuration);
configuration.addresses = {std::make_pair(configuration.host, configuration.port)};
mysql_settings.applyChanges(settings_changes);
mysql_settings->applyChanges(settings_changes);
if (!storage_specific_args.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS,
@ -228,15 +228,14 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
{
if (engine_name == "MySQL")
{
auto mysql_database_settings = std::make_unique<ConnectionMySQLSettings>();
auto mysql_pool = createMySQLPoolWithFailover(configuration, mysql_settings);
mysql_settings->loadFromQueryContext(context);
mysql_settings->loadFromQuery(*engine_define); /// higher priority
mysql_database_settings->loadFromQueryContext(context);
mysql_database_settings->loadFromQuery(*engine_define); /// higher priority
auto mysql_pool = createMySQLPoolWithFailover(configuration, *mysql_settings);
return std::make_shared<DatabaseMySQL>(
context, database_name, metadata_path, engine_define, configuration.database,
std::move(mysql_database_settings), std::move(mysql_pool), create.attach);
std::move(mysql_settings), std::move(mysql_pool), create.attach);
}
MySQLClient client(configuration.host, configuration.port, configuration.username, configuration.password);

View File

@ -703,22 +703,6 @@ bool CachedOnDiskReadBufferFromFile::updateImplementationBufferIfNeeded()
size_t download_offset = file_segment->getDownloadOffset();
bool cached_part_is_finished = download_offset == file_offset_of_buffer_end;
#ifndef NDEBUG
size_t cache_file_size = getFileSizeFromReadBuffer(*implementation_buffer);
size_t cache_file_read_offset = implementation_buffer->getFileOffsetOfBufferEnd();
size_t implementation_buffer_finished = cache_file_size == cache_file_read_offset;
if (cached_part_is_finished != implementation_buffer_finished)
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Incorrect state of buffers. Current download offset: {}, file offset of buffer end: {}, "
"cache file size: {}, cache file offset: {}, file segment info: {}",
download_offset, file_offset_of_buffer_end, cache_file_size, cache_file_read_offset,
file_segment->getInfoForLog());
}
#endif
if (cached_part_is_finished)
{
/// TODO: makes sense to reuse local file reader if we return here with CACHED read type again?

View File

@ -82,7 +82,7 @@ ProcessList::EntryPtr ProcessList::insert(const String & query_, const IAST * as
bool is_unlimited_query = isUnlimitedQuery(ast);
{
std::unique_lock lock(mutex);
auto [lock, overcommit_blocker] = safeLock(); // To avoid deadlock in case of OOM
IAST::QueryKind query_kind = ast->getQueryKind();
const auto queue_max_wait_ms = settings.queue_max_wait_ms.totalMilliseconds();
@ -269,7 +269,7 @@ ProcessList::EntryPtr ProcessList::insert(const String & query_, const IAST * as
ProcessListEntry::~ProcessListEntry()
{
std::lock_guard lock(parent.mutex);
auto lock = parent.safeLock();
String user = it->getClientInfo().current_user;
String query_id = it->getClientInfo().current_query_id;
@ -430,7 +430,7 @@ QueryStatus * ProcessList::tryGetProcessListElement(const String & current_query
CancellationCode ProcessList::sendCancelToQuery(const String & current_query_id, const String & current_user, bool kill)
{
std::lock_guard lock(mutex);
auto lock = safeLock();
QueryStatus * elem = tryGetProcessListElement(current_query_id, current_user);
@ -443,7 +443,7 @@ CancellationCode ProcessList::sendCancelToQuery(const String & current_query_id,
void ProcessList::killAllQueries()
{
std::lock_guard lock(mutex);
auto lock = safeLock();
for (auto & process : processes)
process.cancelQuery(true);
@ -495,7 +495,7 @@ ProcessList::Info ProcessList::getInfo(bool get_thread_list, bool get_profile_ev
{
Info per_query_infos;
std::lock_guard lock(mutex);
auto lock = safeLock();
per_query_infos.reserve(processes.size());
for (const auto & process : processes)
@ -530,7 +530,7 @@ ProcessList::UserInfo ProcessList::getUserInfo(bool get_profile_events) const
{
UserInfo per_user_infos;
std::lock_guard lock(mutex);
auto lock = safeLock();
per_user_infos.reserve(user_to_queries.size());

View File

@ -285,7 +285,26 @@ public:
};
class ProcessList
class ProcessListBase
{
mutable std::mutex mutex;
protected:
using Lock = std::unique_lock<std::mutex>;
struct LockAndBlocker
{
Lock lock;
OvercommitTrackerBlockerInThread blocker;
};
// It is forbidden to do allocations/deallocations with acquired mutex and
// enabled OvercommitTracker. This leads to deadlock in the case of OOM.
LockAndBlocker safeLock() const noexcept { return { std::unique_lock{mutex}, {} }; }
Lock unsafeLock() const noexcept { return std::unique_lock{mutex}; }
};
class ProcessList : public ProcessListBase
{
public:
using Element = QueryStatus;
@ -304,10 +323,10 @@ public:
protected:
friend class ProcessListEntry;
friend struct ::OvercommitTracker;
friend struct ::UserOvercommitTracker;
friend struct ::GlobalOvercommitTracker;
mutable std::mutex mutex;
mutable std::condition_variable have_space; /// Number of currently running queries has become less than maximum.
/// List of queries
@ -360,19 +379,19 @@ public:
void setMaxSize(size_t max_size_)
{
std::lock_guard lock(mutex);
auto lock = unsafeLock();
max_size = max_size_;
}
void setMaxInsertQueriesAmount(size_t max_insert_queries_amount_)
{
std::lock_guard lock(mutex);
auto lock = unsafeLock();
max_insert_queries_amount = max_insert_queries_amount_;
}
void setMaxSelectQueriesAmount(size_t max_select_queries_amount_)
{
std::lock_guard lock(mutex);
auto lock = unsafeLock();
max_select_queries_amount = max_select_queries_amount_;
}

View File

@ -154,17 +154,6 @@ void RequiredSourceColumnsMatcher::visit(const ASTSelectQuery & select, const AS
find_columns(interpolate->as<ASTInterpolateElement>()->expr.get());
}
if (const auto & with = select.with())
{
for (auto & node : with->children)
{
if (const auto * identifier = node->as<ASTIdentifier>())
data.addColumnIdentifier(*identifier);
else
data.addColumnAliasIfAny(*node);
}
}
std::vector<ASTPtr *> out;
for (const auto & node : select.children)
{

View File

@ -1862,6 +1862,11 @@ void GRPCServer::start()
queue = builder.AddCompletionQueue();
grpc_server = builder.BuildAndStart();
if (nullptr == grpc_server)
{
throw DB::Exception("Can't start grpc server, there is a port conflict", DB::ErrorCodes::NETWORK_ERROR);
}
runner->start();
}

View File

@ -17,6 +17,7 @@
#endif
#if USE_MYSQL
#include <Storages/MySQL/MySQLSettings.h>
#include <Databases/MySQL/ConnectionMySQLSettings.h>
#endif
#if USE_NATSIO
#include <Storages/NATS/NATSSettings.h>
@ -575,6 +576,10 @@ template
std::optional<ExternalDataSourceInfo> getExternalDataSourceConfiguration(
const ASTs & args, ContextPtr context, bool is_database_engine, bool throw_on_no_collection, const BaseSettings<MySQLSettingsTraits> & storage_settings);
template
std::optional<ExternalDataSourceInfo> getExternalDataSourceConfiguration(
const ASTs & args, ContextPtr context, bool is_database_engine, bool throw_on_no_collection, const BaseSettings<ConnectionMySQLSettingsTraits> & storage_settings);
template
std::optional<ExternalDataSourceInfo> getExternalDataSourceConfiguration(
const Poco::Util::AbstractConfiguration & dict_config, const String & dict_config_prefix,
@ -583,5 +588,6 @@ std::optional<ExternalDataSourceInfo> getExternalDataSourceConfiguration(
template
SettingsChanges getSettingsChangesFromConfig(
const BaseSettings<MySQLSettingsTraits> & settings, const Poco::Util::AbstractConfiguration & config, const String & config_prefix);
#endif
}

View File

@ -4,6 +4,7 @@
#include <mysqlxx/PoolWithFailover.h>
#include <Storages/ExternalDataSourceConfiguration.h>
#include <Storages/MySQL/MySQLSettings.h>
#include <Databases/MySQL/ConnectionMySQLSettings.h>
namespace DB
{
@ -13,8 +14,8 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
}
mysqlxx::PoolWithFailover
createMySQLPoolWithFailover(const StorageMySQLConfiguration & configuration, const MySQLSettings & mysql_settings)
template <typename T> mysqlxx::PoolWithFailover
createMySQLPoolWithFailover(const StorageMySQLConfiguration & configuration, const T & mysql_settings)
{
if (!mysql_settings.connection_pool_size)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Connection pool cannot have zero size");
@ -29,6 +30,11 @@ createMySQLPoolWithFailover(const StorageMySQLConfiguration & configuration, con
mysql_settings.read_write_timeout);
}
template
mysqlxx::PoolWithFailover createMySQLPoolWithFailover(const StorageMySQLConfiguration & configuration, const MySQLSettings & mysql_settings);
template
mysqlxx::PoolWithFailover createMySQLPoolWithFailover(const StorageMySQLConfiguration & configuration, const ConnectionMySQLSettings & mysql_settings);
}
#endif

View File

@ -9,10 +9,9 @@ namespace mysqlxx { class PoolWithFailover; }
namespace DB
{
struct StorageMySQLConfiguration;
struct MySQLSettings;
mysqlxx::PoolWithFailover
createMySQLPoolWithFailover(const StorageMySQLConfiguration & configuration, const MySQLSettings & mysql_settings);
template <typename T> mysqlxx::PoolWithFailover
createMySQLPoolWithFailover(const StorageMySQLConfiguration & configuration, const T & mysql_settings);
}

View File

@ -15,13 +15,18 @@ namespace ErrorCodes
IMPLEMENT_SETTINGS_TRAITS(MySQLSettingsTraits, LIST_OF_MYSQL_SETTINGS)
void MySQLSettings::loadFromQuery(const ASTSetQuery & settings_def)
{
applyChanges(settings_def.changes);
}
void MySQLSettings::loadFromQuery(ASTStorage & storage_def)
{
if (storage_def.settings)
{
try
{
applyChanges(storage_def.settings->changes);
loadFromQuery(*storage_def.settings);
}
catch (Exception & e)
{
@ -39,4 +44,3 @@ void MySQLSettings::loadFromQuery(ASTStorage & storage_def)
}
}

View File

@ -13,6 +13,7 @@ namespace Poco::Util
namespace DB
{
class ASTStorage;
class ASTSetQuery;
#define LIST_OF_MYSQL_SETTINGS(M) \
M(UInt64, connection_pool_size, 16, "Size of connection pool (if all connections are in use, the query will wait until some connection will be freed).", 0) \
@ -32,6 +33,7 @@ using MySQLBaseSettings = BaseSettings<MySQLSettingsTraits>;
struct MySQLSettings : public MySQLBaseSettings
{
void loadFromQuery(ASTStorage & storage_def);
void loadFromQuery(const ASTSetQuery & settings_def);
};

View File

@ -37,11 +37,26 @@ void TableFunctionMySQL::parseArguments(const ASTPtr & ast_function, ContextPtr
if (!args_func.arguments)
throw Exception("Table function 'mysql' must have arguments.", ErrorCodes::LOGICAL_ERROR);
auto & args = args_func.arguments->children;
MySQLSettings mysql_settings;
configuration = StorageMySQL::getConfiguration(args_func.arguments->children, context, mysql_settings);
const auto & settings = context->getSettingsRef();
mysql_settings.connect_timeout = settings.external_storage_connect_timeout_sec;
mysql_settings.read_write_timeout = settings.external_storage_rw_timeout_sec;
for (auto it = args.begin(); it != args.end(); ++it)
{
const ASTSetQuery * settings_ast = (*it)->as<ASTSetQuery>();
if (settings_ast)
{
mysql_settings.loadFromQuery(*settings_ast);
args.erase(it);
break;
}
}
configuration = StorageMySQL::getConfiguration(args, context, mysql_settings);
pool.emplace(createMySQLPoolWithFailover(*configuration, mysql_settings));
}

View File

@ -0,0 +1,11 @@
<yandex>
<users>
<test_dns>
<password/>
<networks>
<host_regexp>test1\.example\.com$</host_regexp>
</networks>
<profile>default</profile>
</test_dns>
</users>
</yandex>

View File

@ -0,0 +1,5 @@
<yandex>
<listen_host>::</listen_host>
<listen_host>0.0.0.0</listen_host>
<listen_try>1</listen_try>
</yandex>

View File

@ -0,0 +1,46 @@
import pytest
from helpers.cluster import ClickHouseCluster, get_docker_compose_path, run_and_check
import os
DOCKER_COMPOSE_PATH = get_docker_compose_path()
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
cluster = ClickHouseCluster(__file__)
ch_server = cluster.add_instance(
"clickhouse-server",
main_configs=["configs/listen_host.xml"],
user_configs=["configs/host_regexp.xml"],
)
client = cluster.add_instance(
"clickhouse-client",
)
def build_endpoint_v4(ip):
return f"'http://{ip}:8123/?query=SELECT+1&user=test_dns'"
@pytest.fixture(scope="module")
def started_cluster():
global cluster
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_host_regexp_multiple_ptr_hosts_file_v4(started_cluster):
server_ip = cluster.get_instance_ip("clickhouse-server")
client_ip = cluster.get_instance_ip("clickhouse-client")
ch_server.exec_in_container(
(["bash", "-c", f"echo '{client_ip} test1.example.com' > /etc/hosts"])
)
endpoint = build_endpoint_v4(server_ip)
assert "1\n" == client.exec_in_container((["bash", "-c", f"curl {endpoint}"]))

View File

@ -250,7 +250,7 @@ def test_mysql_client_exception(started_cluster):
expected_msg = "\n".join(
[
"mysql: [Warning] Using a password on the command line interface can be insecure.",
"ERROR 1000 (00000) at line 1: Poco::Exception. Code: 1000, e.code() = 0, Exception: Connections to all replicas failed: default@127.0.0.1:10086 as user default",
"ERROR 1000 (00000) at line 1: Poco::Exception. Code: 1000, e.code() = 0, Exception: Connections to mysql failed: default@127.0.0.1:10086 as user default",
]
)
assert stderr[: len(expected_msg)].decode() == expected_msg

View File

@ -7,12 +7,18 @@
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</s3>
<cache_s3>
<type>cache</type>
<disk>s3</disk>
<max_size>100000000</max_size>
<path>./cache_s3/</path>
</cache_s3>
</disks>
<policies>
<s3>
<volumes>
<main>
<disk>s3</disk>
<disk>cache_s3</disk>
</main>
</volumes>
</s3>

View File

@ -30,5 +30,16 @@
<table>test_table</table>
<connection_pool_size>0</connection_pool_size>
</mysql4>
<mysql_with_settings>
<user>root</user>
<password>clickhouse</password>
<host>mysql57</host>
<port>3306</port>
<database>clickhouse</database>
<table>test_settings</table>
<connection_pool_size>1</connection_pool_size>
<read_write_timeout>20123001</read_write_timeout>
<connect_timeout>20123002</connect_timeout>
</mysql_with_settings>
</named_collections>
</clickhouse>

View File

@ -732,6 +732,93 @@ def test_mysql_null(started_cluster):
conn.close()
def test_settings(started_cluster):
table_name = "test_settings"
node1.query(f"DROP TABLE IF EXISTS {table_name}")
wait_timeout = 123
rw_timeout = 10123001
connect_timeout = 10123002
connection_pool_size = 1
conn = get_mysql_conn(started_cluster, cluster.mysql_ip)
drop_mysql_table(conn, table_name)
create_mysql_table(conn, table_name)
node1.query(
f"""
CREATE TABLE {table_name}
(
id UInt32,
name String,
age UInt32,
money UInt32
)
ENGINE = MySQL('mysql57:3306', 'clickhouse', '{table_name}', 'root', 'clickhouse')
SETTINGS connection_wait_timeout={wait_timeout}, connect_timeout={connect_timeout}, read_write_timeout={rw_timeout}, connection_pool_size={connection_pool_size}
"""
)
node1.query(f"SELECT * FROM {table_name}")
assert node1.contains_in_log(
f"with settings: connect_timeout={connect_timeout}, read_write_timeout={rw_timeout}"
)
rw_timeout = 20123001
connect_timeout = 20123002
node1.query(f"SELECT * FROM mysql(mysql_with_settings)")
assert node1.contains_in_log(
f"with settings: connect_timeout={connect_timeout}, read_write_timeout={rw_timeout}"
)
rw_timeout = 30123001
connect_timeout = 30123002
node1.query(
f"""
SELECT *
FROM mysql('mysql57:3306', 'clickhouse', '{table_name}', 'root', 'clickhouse',
SETTINGS
connection_wait_timeout={wait_timeout},
connect_timeout={connect_timeout},
read_write_timeout={rw_timeout},
connection_pool_size={connection_pool_size})
"""
)
assert node1.contains_in_log(
f"with settings: connect_timeout={connect_timeout}, read_write_timeout={rw_timeout}"
)
rw_timeout = 40123001
connect_timeout = 40123002
node1.query(
f"""
CREATE DATABASE m
ENGINE = MySQL(mysql_with_settings, connection_wait_timeout={wait_timeout}, connect_timeout={connect_timeout}, read_write_timeout={rw_timeout}, connection_pool_size={connection_pool_size})
"""
)
assert node1.contains_in_log(
f"with settings: connect_timeout={connect_timeout}, read_write_timeout={rw_timeout}"
)
rw_timeout = 50123001
connect_timeout = 50123002
node1.query(
f"""
CREATE DATABASE mm ENGINE = MySQL('mysql57:3306', 'clickhouse', 'root', 'clickhouse')
SETTINGS
connection_wait_timeout={wait_timeout},
connect_timeout={connect_timeout},
read_write_timeout={rw_timeout},
connection_pool_size={connection_pool_size}
"""
)
assert node1.contains_in_log(
f"with settings: connect_timeout={connect_timeout}, read_write_timeout={rw_timeout}"
)
drop_mysql_table(conn, table_name)
conn.close()
if __name__ == "__main__":
with contextmanager(started_cluster)() as cluster:
for name, instance in list(cluster.instances.items()):

View File

@ -0,0 +1,9 @@
WITH x AS y SELECT 1;
DROP TEMPORARY TABLE IF EXISTS t1;
DROP TEMPORARY TABLE IF EXISTS t2;
CREATE TEMPORARY TABLE t1 (a Int64);
CREATE TEMPORARY TABLE t2 (a Int64, b Int64);
WITH b AS bb SELECT bb FROM t2 WHERE a IN (SELECT a FROM t1);