Merge branch 'master' into show-table

This commit is contained in:
flynn 2023-04-10 14:55:16 +08:00 committed by GitHub
commit feb1dce1e7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
111 changed files with 585 additions and 287 deletions

View File

@ -155,13 +155,13 @@ struct common_type<wide::integer<Bits, Signed>, Arithmetic>
std::is_floating_point_v<Arithmetic>,
Arithmetic,
std::conditional_t<
sizeof(Arithmetic) < Bits * sizeof(long),
sizeof(Arithmetic) * 8 < Bits,
wide::integer<Bits, Signed>,
std::conditional_t<
Bits * sizeof(long) < sizeof(Arithmetic),
Bits < sizeof(Arithmetic) * 8,
Arithmetic,
std::conditional_t<
Bits * sizeof(long) == sizeof(Arithmetic) && (std::is_same_v<Signed, signed> || std::is_signed_v<Arithmetic>),
Bits == sizeof(Arithmetic) * 8 && (std::is_same_v<Signed, signed> || std::is_signed_v<Arithmetic>),
Arithmetic,
wide::integer<Bits, Signed>>>>>;
};

2
contrib/cctz vendored

@ -1 +1 @@
Subproject commit 7c78edd52b4d65acc103c2f195818ffcabe6fe0d
Subproject commit 5e05432420f9692418e2e12aff09859e420b14a2

View File

@ -6,7 +6,7 @@ title: deltaSumTimestamp
Adds the difference between consecutive rows. If the difference is negative, it is ignored.
This function is primarily for [materialized views](../../../sql-reference/statements/create/view.md#materialized) that are ordered by some time bucket-aligned timestamp, for example, a `toStartOfMinute` bucket. Because the rows in such a materialized view will all have the same timestamp, it is impossible for them to be merged in the "right" order. This function keeps track of the `timestamp` of the values it's seen, so it's possible to order the states correctly during merging.
This function is primarily for [materialized views](../../../sql-reference/statements/create/view.md#materialized) that store data ordered by some time bucket-aligned timestamp, for example, a `toStartOfMinute` bucket. Because the rows in such a materialized view will all have the same timestamp, it is impossible for them to be merged in the correct order, without storing the original, unrounded timestamp value. The `deltaSumTimestamp` function keeps track of the original `timestamp` of the values it's seen, so the values (states) of the function are correctly computed during merging of parts.
To calculate the delta sum across an ordered collection you can simply use the [deltaSum](../../../sql-reference/aggregate-functions/reference/deltasum.md#agg_functions-deltasum) function.

View File

@ -441,11 +441,11 @@ SELECT farmHash64(array('e','x','a'), 'mple', 10, toDateTime('2019-06-15 23:00:0
## javaHash
Calculates JavaHash from a [string](http://hg.openjdk.java.net/jdk8u/jdk8u/jdk/file/478a4add975b/src/share/classes/java/lang/String.java#l1452),
[Byte](https://hg.openjdk.java.net/jdk8u/jdk8u/jdk/file/478a4add975b/src/share/classes/java/lang/Byte.java#l405),
[Short](https://hg.openjdk.java.net/jdk8u/jdk8u/jdk/file/478a4add975b/src/share/classes/java/lang/Short.java#l410),
[Integer](https://hg.openjdk.java.net/jdk8u/jdk8u/jdk/file/478a4add975b/src/share/classes/java/lang/Integer.java#l959),
[Long](https://hg.openjdk.java.net/jdk8u/jdk8u/jdk/file/478a4add975b/src/share/classes/java/lang/Long.java#l1060).
Calculates JavaHash from a [string](http://hg.openjdk.java.net/jdk8u/jdk8u/jdk/file/478a4add975b/src/share/classes/java/lang/String.java#l1452),
[Byte](https://hg.openjdk.java.net/jdk8u/jdk8u/jdk/file/478a4add975b/src/share/classes/java/lang/Byte.java#l405),
[Short](https://hg.openjdk.java.net/jdk8u/jdk8u/jdk/file/478a4add975b/src/share/classes/java/lang/Short.java#l410),
[Integer](https://hg.openjdk.java.net/jdk8u/jdk8u/jdk/file/478a4add975b/src/share/classes/java/lang/Integer.java#l959),
[Long](https://hg.openjdk.java.net/jdk8u/jdk8u/jdk/file/478a4add975b/src/share/classes/java/lang/Long.java#l1060).
This hash function is neither fast nor having a good quality. The only reason to use it is when this algorithm is already used in another system and you have to calculate exactly the same result.
Note that Java only support calculating signed integers hash, so if you want to calculate unsigned integers hash you must cast it to proper signed ClickHouse types.
@ -660,6 +660,45 @@ Result:
└──────────────────────┴─────────────────────┘
```
## kafkaMurmurHash
Calculates a 32-bit [MurmurHash2](https://github.com/aappleby/smhasher) hash value using the same hash seed as [Kafka](https://github.com/apache/kafka/blob/461c5cfe056db0951d9b74f5adc45973670404d7/clients/src/main/java/org/apache/kafka/common/utils/Utils.java#L482) and without the highest bit to be compatible with [Default Partitioner](https://github.com/apache/kafka/blob/139f7709bd3f5926901a21e55043388728ccca78/clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java#L328).
**Syntax**
```sql
MurmurHash(par1, ...)
```
**Arguments**
- `par1, ...` — A variable number of parameters that can be any of the [supported data types](/docs/en/sql-reference/data-types/index.md/#data_types).
**Returned value**
- Calculated hash value.
Type: [UInt32](/docs/en/sql-reference/data-types/int-uint.md).
**Example**
Query:
```sql
SELECT
kafkaMurmurHash('foobar') AS res1,
kafkaMurmurHash(array('e','x','a'), 'mple', 10, toDateTime('2019-06-15 23:00:00')) AS res2
```
Result:
```response
┌───────res1─┬─────res2─┐
│ 1357151166 │ 85479775 │
└────────────┴──────────┘
```
## murmurHash3_32, murmurHash3_64
Produces a [MurmurHash3](https://github.com/aappleby/smhasher) hash value.

View File

@ -7,7 +7,7 @@ sidebar_position: 141
Суммирует разницу между последовательными строками. Если разница отрицательна — она будет проигнорирована.
Эта функция предназначена в первую очередь для [материализованных представлений](../../../sql-reference/statements/create/view.md#materialized), упорядоченных по некоторому временному бакету согласно timestamp, например, по бакету `toStartOfMinute`. Поскольку строки в таком материализованном представлении будут иметь одинаковый timestamp, невозможно объединить их в "правом" порядке. Функция отслеживает `timestamp` наблюдаемых значений, поэтому возможно правильно упорядочить состояния во время слияния.
Эта функция предназначена в первую очередь для [материализованных представлений](../../../sql-reference/statements/create/view.md#materialized), хранящих данные, упорядоченные по некоторому округленному временному интервалу, согласно timestamp, например, по бакету `toStartOfMinute`. Поскольку строки в таком материализованном представлении будут иметь одинаковый timestamp, их невозможно объединить в правильном порядке без хранения исходного, неокругленного значения timestamp. Функция `deltaSumTimestamp` отслеживает исходные `timestamp` наблюдаемых значений, поэтому значения (состояния) функции правильно вычисляются во время слияния кусков.
Чтобы вычислить разницу между упорядоченными последовательными строками, вы можете использовать функцию [deltaSum](../../../sql-reference/aggregate-functions/reference/deltasum.md#agg_functions-deltasum) вместо функции `deltaSumTimestamp`.

View File

@ -17,7 +17,6 @@
#include <Poco/Net/TCPServerParams.h>
#include <Poco/Net/TCPServer.h>
#include <Poco/Util/HelpFormatter.h>
#include <Poco/Version.h>
#include <Poco/Environment.h>
#include <sys/stat.h>
#include <pwd.h>

View File

@ -981,7 +981,7 @@ try
StatusFile status{path / "status", StatusFile::write_full_info};
DB::ServerUUID::load(path / "uuid", log);
ServerUUID::load(path / "uuid", log);
/// Try to increase limit on number of open files.
{

View File

@ -10,6 +10,7 @@
#include <Interpreters/Access/InterpreterCreateUserQuery.h>
#include <Interpreters/Access/InterpreterShowGrantsQuery.h>
#include <Common/logger_useful.h>
#include <Common/ThreadPool.h>
#include <Poco/JSON/JSON.h>
#include <Poco/JSON/Object.h>
#include <Poco/JSON/Stringifier.h>
@ -19,6 +20,7 @@
#include <base/range.h>
#include <filesystem>
#include <fstream>
#include <memory>
namespace DB
@ -317,15 +319,15 @@ void DiskAccessStorage::scheduleWriteLists(AccessEntityType type)
return; /// If the lists' writing thread is still waiting we can update `types_of_lists_to_write` easily,
/// without restarting that thread.
if (lists_writing_thread.joinable())
lists_writing_thread.join();
if (lists_writing_thread && lists_writing_thread->joinable())
lists_writing_thread->join();
/// Create the 'need_rebuild_lists.mark' file.
/// This file will be used later to find out if writing lists is successful or not.
std::ofstream out{getNeedRebuildListsMarkFilePath(directory_path)};
out.close();
lists_writing_thread = ThreadFromGlobalPool{&DiskAccessStorage::listsWritingThreadFunc, this};
lists_writing_thread = std::make_unique<ThreadFromGlobalPool>(&DiskAccessStorage::listsWritingThreadFunc, this);
lists_writing_thread_is_waiting = true;
}
@ -349,10 +351,10 @@ void DiskAccessStorage::listsWritingThreadFunc()
void DiskAccessStorage::stopListsWritingThread()
{
if (lists_writing_thread.joinable())
if (lists_writing_thread && lists_writing_thread->joinable())
{
lists_writing_thread_should_exit.notify_one();
lists_writing_thread.join();
lists_writing_thread->join();
}
}

View File

@ -1,7 +1,7 @@
#pragma once
#include <Access/MemoryAccessStorage.h>
#include <Common/ThreadPool.h>
#include <Common/ThreadPool_fwd.h>
#include <boost/container/flat_set.hpp>
@ -81,7 +81,7 @@ private:
bool failed_to_write_lists TSA_GUARDED_BY(mutex) = false;
/// List files are written in a separate thread.
ThreadFromGlobalPool lists_writing_thread;
std::unique_ptr<ThreadFromGlobalPool> lists_writing_thread;
/// Signals `lists_writing_thread` to exit.
std::condition_variable lists_writing_thread_should_exit;

View File

@ -1,3 +1,4 @@
#include <memory>
#include <Access/AccessEntityIO.h>
#include <Access/MemoryAccessStorage.h>
#include <Access/ReplicatedAccessStorage.h>
@ -15,6 +16,7 @@
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/escapeForFileName.h>
#include <Common/setThreadName.h>
#include <Common/ThreadPool.h>
#include <base/range.h>
#include <base/sleep.h>
#include <boost/range/algorithm_ext/erase.hpp>
@ -72,7 +74,7 @@ void ReplicatedAccessStorage::startWatchingThread()
{
bool prev_watching_flag = watching.exchange(true);
if (!prev_watching_flag)
watching_thread = ThreadFromGlobalPool(&ReplicatedAccessStorage::runWatchingThread, this);
watching_thread = std::make_unique<ThreadFromGlobalPool>(&ReplicatedAccessStorage::runWatchingThread, this);
}
void ReplicatedAccessStorage::stopWatchingThread()
@ -81,8 +83,8 @@ void ReplicatedAccessStorage::stopWatchingThread()
if (prev_watching_flag)
{
watched_queue->finish();
if (watching_thread.joinable())
watching_thread.join();
if (watching_thread && watching_thread->joinable())
watching_thread->join();
}
}

View File

@ -2,7 +2,7 @@
#include <atomic>
#include <Common/ThreadPool.h>
#include <Common/ThreadPool_fwd.h>
#include <Common/ZooKeeper/Common.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/ConcurrentBoundedQueue.h>
@ -21,7 +21,7 @@ public:
static constexpr char STORAGE_TYPE[] = "replicated";
ReplicatedAccessStorage(const String & storage_name, const String & zookeeper_path, zkutil::GetZooKeeper get_zookeeper, AccessChangesNotifier & changes_notifier_, bool allow_backup);
virtual ~ReplicatedAccessStorage() override;
~ReplicatedAccessStorage() override;
const char * getStorageType() const override { return STORAGE_TYPE; }
@ -43,7 +43,7 @@ private:
std::mutex cached_zookeeper_mutex;
std::atomic<bool> watching = false;
ThreadFromGlobalPool watching_thread;
std::unique_ptr<ThreadFromGlobalPool> watching_thread;
std::shared_ptr<ConcurrentBoundedQueue<UUID>> watched_queue;
std::optional<UUID> insertImpl(const AccessEntityPtr & entity, bool replace_if_exists, bool throw_if_exists) override;

View File

@ -9,7 +9,7 @@
#include <Interpreters/Context_fwd.h>
#include <base/types.h>
#include <Common/Exception.h>
#include <Common/ThreadPool.h>
#include <Common/ThreadPool_fwd.h>
#include <Core/IResolvedFunction.h>
#include "config.h"

View File

@ -1,5 +1,6 @@
#include <Backups/BackupCoordinationFileInfos.h>
#include <Common/quoteString.h>
#include <Common/Exception.h>
namespace DB

View File

@ -15,6 +15,7 @@
#include <base/sleep.h>
#include <Common/escapeForFileName.h>
#include <boost/range/algorithm/copy.hpp>
#include <base/scope_guard.h>
#include <filesystem>
namespace fs = std::filesystem;

View File

@ -6,6 +6,7 @@
#include <Common/logger_useful.h>
#include <Common/scope_guard_safe.h>
#include <Common/setThreadName.h>
#include <Common/ThreadPool.h>
#include <IO/HashingReadBuffer.h>

View File

@ -1,8 +1,9 @@
#pragma once
#include <Core/Types.h>
#include <Common/ThreadPool.h>
#include <Common/ThreadPool_fwd.h>
namespace Poco { class Logger; }
namespace DB
{

View File

@ -1,7 +1,6 @@
#pragma once
#include <Parsers/ASTBackupQuery.h>
#include <Common/ThreadPool.h>
namespace DB

View File

@ -23,6 +23,7 @@
#include <Common/CurrentMetrics.h>
#include <Common/setThreadName.h>
#include <Common/scope_guard_safe.h>
#include <Common/ThreadPool.h>
namespace CurrentMetrics
@ -182,8 +183,8 @@ namespace
BackupsWorker::BackupsWorker(size_t num_backup_threads, size_t num_restore_threads, bool allow_concurrent_backups_, bool allow_concurrent_restores_)
: backups_thread_pool(CurrentMetrics::BackupsThreads, CurrentMetrics::BackupsThreadsActive, num_backup_threads, /* max_free_threads = */ 0, num_backup_threads)
, restores_thread_pool(CurrentMetrics::RestoreThreads, CurrentMetrics::RestoreThreadsActive, num_restore_threads, /* max_free_threads = */ 0, num_restore_threads)
: backups_thread_pool(std::make_unique<ThreadPool>(CurrentMetrics::BackupsThreads, CurrentMetrics::BackupsThreadsActive, num_backup_threads, /* max_free_threads = */ 0, num_backup_threads))
, restores_thread_pool(std::make_unique<ThreadPool>(CurrentMetrics::RestoreThreads, CurrentMetrics::RestoreThreadsActive, num_restore_threads, /* max_free_threads = */ 0, num_restore_threads))
, log(&Poco::Logger::get("BackupsWorker"))
, allow_concurrent_backups(allow_concurrent_backups_)
, allow_concurrent_restores(allow_concurrent_restores_)
@ -248,7 +249,7 @@ OperationID BackupsWorker::startMakingBackup(const ASTPtr & query, const Context
if (backup_settings.async)
{
backups_thread_pool.scheduleOrThrowOnError(
backups_thread_pool->scheduleOrThrowOnError(
[this, backup_query, backup_id, backup_name_for_logging, backup_info, backup_settings, backup_coordination, context_in_use, mutable_context]
{
doBackup(
@ -435,7 +436,7 @@ void BackupsWorker::buildFileInfosForBackupEntries(const BackupPtr & backup, con
LOG_TRACE(log, "{}", Stage::BUILDING_FILE_INFOS);
backup_coordination->setStage(Stage::BUILDING_FILE_INFOS, "");
backup_coordination->waitForStage(Stage::BUILDING_FILE_INFOS);
backup_coordination->addFileInfos(::DB::buildFileInfosForBackupEntries(backup_entries, backup->getBaseBackup(), backups_thread_pool));
backup_coordination->addFileInfos(::DB::buildFileInfosForBackupEntries(backup_entries, backup->getBaseBackup(), *backups_thread_pool));
}
@ -522,7 +523,7 @@ void BackupsWorker::writeBackupEntries(BackupMutablePtr backup, BackupEntries &&
}
};
if (always_single_threaded || !backups_thread_pool.trySchedule([job] { job(true); }))
if (always_single_threaded || !backups_thread_pool->trySchedule([job] { job(true); }))
job(false);
}
@ -581,7 +582,7 @@ OperationID BackupsWorker::startRestoring(const ASTPtr & query, ContextMutablePt
if (restore_settings.async)
{
restores_thread_pool.scheduleOrThrowOnError(
restores_thread_pool->scheduleOrThrowOnError(
[this, restore_query, restore_id, backup_name_for_logging, backup_info, restore_settings, restore_coordination, context_in_use]
{
doRestore(
@ -716,7 +717,7 @@ void BackupsWorker::doRestore(
}
/// Execute the data restoring tasks.
restoreTablesData(restore_id, backup, std::move(data_restore_tasks), restores_thread_pool);
restoreTablesData(restore_id, backup, std::move(data_restore_tasks), *restores_thread_pool);
/// We have restored everything, we need to tell other hosts (they could be waiting for it).
restore_coordination->setStage(Stage::COMPLETED, "");
@ -941,8 +942,8 @@ void BackupsWorker::shutdown()
if (has_active_backups_and_restores)
LOG_INFO(log, "Waiting for {} backups and {} restores to be finished", num_active_backups, num_active_restores);
backups_thread_pool.wait();
restores_thread_pool.wait();
backups_thread_pool->wait();
restores_thread_pool->wait();
if (has_active_backups_and_restores)
LOG_INFO(log, "All backup and restore tasks have finished");

View File

@ -1,7 +1,8 @@
#pragma once
#include <Backups/BackupStatus.h>
#include <Common/ThreadPool.h>
#include <Common/ThreadPool_fwd.h>
#include <Interpreters/Context_fwd.h>
#include <Core/UUID.h>
#include <Parsers/IAST_fwd.h>
#include <unordered_map>
@ -132,8 +133,8 @@ private:
void setNumFilesAndSize(const OperationID & id, size_t num_files, UInt64 total_size, size_t num_entries,
UInt64 uncompressed_size, UInt64 compressed_size, size_t num_read_files, UInt64 num_read_bytes);
ThreadPool backups_thread_pool;
ThreadPool restores_thread_pool;
std::unique_ptr<ThreadPool> backups_thread_pool;
std::unique_ptr<ThreadPool> restores_thread_pool;
std::unordered_map<OperationID, Info> infos;
std::condition_variable status_changed;

View File

@ -14,6 +14,7 @@
#include <Server/HTTP/HTTPServer.h>
#include <base/errnoToString.h>
#include <base/range.h>
#include <base/scope_guard.h>
#include <sys/time.h>
#include <sys/resource.h>

View File

@ -7,6 +7,7 @@
#include <base/argsToConfig.h>
#include <base/safeExit.h>
#include <base/scope_guard.h>
#include <Core/Block.h>
#include <Core/Protocol.h>
#include <Common/DateLUT.h>
@ -2219,9 +2220,6 @@ void ClientBase::runInteractive()
LineReader lr(history_file, config().has("multiline"), query_extenders, query_delimiters);
#endif
/// Enable bracketed-paste-mode so that we are able to paste multiline queries as a whole.
lr.enableBracketedPaste();
static const std::initializer_list<std::pair<String, String>> backslash_aliases =
{
{ "\\l", "SHOW DATABASES" },
@ -2239,7 +2237,18 @@ void ClientBase::runInteractive()
do
{
auto input = lr.readLine(prompt(), ":-] ");
String input;
{
/// Enable bracketed-paste-mode so that we are able to paste multiline queries as a whole.
/// But keep it disabled outside of query input, because it breaks password input
/// (e.g. if we need to reconnect and show a password prompt).
/// (Alternatively, we could make the password input ignore the control sequences.)
lr.enableBracketedPaste();
SCOPE_EXIT({ lr.disableBracketedPaste(); });
input = lr.readLine(prompt(), ":-] ");
}
if (input.empty())
break;

View File

@ -46,7 +46,10 @@ public:
/// clickhouse-client so that without -m flag, one can still paste multiline queries, and
/// possibly get better pasting performance. See https://cirw.in/blog/bracketed-paste for
/// more details.
/// These methods (if implemented) emit the control characters immediately, without waiting
/// for the next readLine() call.
virtual void enableBracketedPaste() {}
virtual void disableBracketedPaste() {}
protected:
enum InputStatus

View File

@ -519,4 +519,10 @@ void ReplxxLineReader::enableBracketedPaste()
rx.enable_bracketed_paste();
}
void ReplxxLineReader::disableBracketedPaste()
{
bracketed_paste_enabled = false;
rx.disable_bracketed_paste();
}
}

View File

@ -19,6 +19,7 @@ public:
~ReplxxLineReader() override;
void enableBracketedPaste() override;
void disableBracketedPaste() override;
/// If highlight is on, we will set a flag to denote whether the last token is a delimiter.
/// This is useful to determine the behavior of <ENTER> key when multiline is enabled.

View File

@ -4,7 +4,6 @@
#include <vector>
#include <memory>
#include <Poco/Version.h>
#include <Poco/Exception.h>
#include <base/defines.h>

View File

@ -1,8 +1,9 @@
#include <Poco/Util/AbstractConfiguration.h>
#include <Common/Macros.h>
#include <Common/Exception.h>
#include <IO/WriteHelpers.h>
#include <Common/logger_useful.h>
#include <Core/ServerUUID.h>
#include <IO/WriteHelpers.h>
namespace DB
@ -11,6 +12,8 @@ namespace DB
namespace ErrorCodes
{
extern const int SYNTAX_ERROR;
extern const int BAD_ARGUMENTS;
extern const int NO_ELEMENTS_IN_CONFIG;
}
Macros::Macros(const Poco::Util::AbstractConfiguration & config, const String & root_key, Poco::Logger * log)
@ -95,7 +98,7 @@ String Macros::expand(const String & s,
else if (macro_name == "uuid" && !info.expand_special_macros_only)
{
if (info.table_id.uuid == UUIDHelpers::Nil)
throw Exception(ErrorCodes::SYNTAX_ERROR, "Macro 'uuid' and empty arguments of ReplicatedMergeTree "
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Macro 'uuid' and empty arguments of ReplicatedMergeTree "
"are supported only for ON CLUSTER queries with Atomic database engine");
/// For ON CLUSTER queries we don't want to require all macros definitions in initiator's config.
/// However, initiator must check that for cross-replication cluster zookeeper_path does not contain {uuid} macro.
@ -105,6 +108,15 @@ String Macros::expand(const String & s,
res += toString(info.table_id.uuid);
info.expanded_uuid = true;
}
else if (macro_name == "server_uuid")
{
auto uuid = ServerUUID::get();
if (UUIDHelpers::Nil == uuid)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Macro {server_uuid} expanded to zero, which means the UUID is not initialized (most likely it's not a server application)");
res += toString(uuid);
info.expanded_other = true;
}
else if (info.shard && macro_name == "shard")
{
res += *info.shard;
@ -125,7 +137,7 @@ String Macros::expand(const String & s,
info.has_unknown = true;
}
else
throw Exception(ErrorCodes::SYNTAX_ERROR, "No macro '{}' in config while processing substitutions in "
throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG, "No macro '{}' in config while processing substitutions in "
"'{}' at '{}' or macro is not supported here", macro_name, s, toString(begin));
pos = end + 1;
@ -142,7 +154,7 @@ String Macros::getValue(const String & key) const
{
if (auto it = macros.find(key); it != macros.end())
return it->second;
throw Exception(ErrorCodes::SYNTAX_ERROR, "No macro {} in config", key);
throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG, "No macro {} in config", key);
}

View File

@ -51,6 +51,9 @@ struct SpaceSavingArena<StringRef>
{
StringRef emplace(StringRef key)
{
if (!key.data)
return key;
return copyStringInArena(arena, key);
}

View File

@ -18,6 +18,7 @@
#include <Common/MemoryTrackerBlockerInThread.h>
#include <Common/SystemLogBase.h>
#include <Common/ThreadPool.h>
#include <Common/logger_useful.h>
#include <base/scope_guard.h>
@ -35,20 +36,18 @@ namespace
constexpr size_t DBMS_SYSTEM_LOG_QUEUE_SIZE = 1048576;
}
ISystemLog::~ISystemLog() = default;
void ISystemLog::stopFlushThread()
{
{
std::lock_guard lock(mutex);
if (!saving_thread.joinable())
{
if (!saving_thread || !saving_thread->joinable())
return;
}
if (is_shutdown)
{
return;
}
is_shutdown = true;
@ -56,13 +55,13 @@ void ISystemLog::stopFlushThread()
flush_event.notify_all();
}
saving_thread.join();
saving_thread->join();
}
void ISystemLog::startup()
{
std::lock_guard lock(mutex);
saving_thread = ThreadFromGlobalPool([this] { savingThreadFunction(); });
saving_thread = std::make_unique<ThreadFromGlobalPool>([this] { savingThreadFunction(); });
}
static thread_local bool recursive_add_call = false;

View File

@ -10,7 +10,7 @@
#include <Interpreters/Context_fwd.h>
#include <Parsers/IAST_fwd.h>
#include <Storages/IStorage_fwd.h>
#include <Common/ThreadPool.h>
#include <Common/ThreadPool_fwd.h>
#define SYSTEM_LOG_ELEMENTS(M) \
M(AsynchronousMetricLogElement) \
@ -60,12 +60,12 @@ public:
/// Stop the background flush thread before destructor. No more data will be written.
virtual void shutdown() = 0;
virtual ~ISystemLog() = default;
virtual ~ISystemLog();
virtual void savingThreadFunction() = 0;
protected:
ThreadFromGlobalPool saving_thread;
std::unique_ptr<ThreadFromGlobalPool> saving_thread;
/// Data shared between callers of add()/flush()/shutdown(), and the saving thread
std::mutex mutex;

View File

@ -17,6 +17,7 @@
#include <Common/ThreadStatus.h>
#include <Common/OpenTelemetryTraceContext.h>
#include <Common/CurrentMetrics.h>
#include <Common/ThreadPool_fwd.h>
#include <base/scope_guard.h>
/** Very simple thread pool similar to boost::threadpool.

View File

@ -0,0 +1,13 @@
#pragma once
template <typename Thread>
class ThreadPoolImpl;
template <bool propagate_opentelemetry_context>
class ThreadFromGlobalPoolImpl;
using ThreadFromGlobalPoolNoTracingContextPropagation = ThreadFromGlobalPoolImpl<false>;
using ThreadFromGlobalPool = ThreadFromGlobalPoolImpl<true>;
using ThreadPool = ThreadPoolImpl<ThreadFromGlobalPoolNoTracingContextPropagation>;

View File

@ -23,12 +23,6 @@ PoolWithFailover PoolFactory::get(const std::string & config_name, unsigned defa
return get(Poco::Util::Application::instance().config(), config_name, default_connections, max_connections, max_tries);
}
/// Duplicate of code from StringUtils.h. Copied here for less dependencies.
static bool startsWith(const std::string & s, const char * prefix)
{
return s.size() >= strlen(prefix) && 0 == memcmp(s.data(), prefix, strlen(prefix));
}
static std::string getPoolEntryName(const Poco::Util::AbstractConfiguration & config,
const std::string & config_name)
{
@ -55,7 +49,7 @@ static std::string getPoolEntryName(const Poco::Util::AbstractConfiguration & co
for (const auto & replica_config_key : replica_keys)
{
/// There could be another elements in the same level in configuration file, like "user", "port"...
if (startsWith(replica_config_key, "replica"))
if (replica_config_key.starts_with("replica"))
{
std::string replica_name = config_name + "." + replica_config_key;
std::string tmp_host = config.getString(replica_name + ".host", host);

View File

@ -10,6 +10,7 @@
#include <libnuraft/nuraft.hxx>
#include <libnuraft/raft_server.hxx>
#include <Common/ConcurrentBoundedQueue.h>
#include <Common/ThreadPool.h>
namespace DB
{

View File

@ -12,6 +12,7 @@
#include <Common/StringUtils/StringUtils.h>
#include <Common/ZooKeeper/IKeeper.h>
#include <base/hex.h>
#include <base/scope_guard.h>
#include <Common/logger_useful.h>
#include <Common/setThreadName.h>
#include <Common/LockMemoryExceptionInThread.h>

View File

@ -4,6 +4,7 @@
#include <Common/Stopwatch.h>
#include <Common/CurrentThread.h>
#include <Common/logger_useful.h>
#include <Common/ThreadPool.h>
#include <chrono>
@ -160,7 +161,7 @@ BackgroundSchedulePool::BackgroundSchedulePool(size_t size_, CurrentMetrics::Met
for (auto & thread : threads)
thread = ThreadFromGlobalPoolNoTracingContextPropagation([this] { threadFunction(); });
delayed_thread = ThreadFromGlobalPoolNoTracingContextPropagation([this] { delayExecutionThreadFunction(); });
delayed_thread = std::make_unique<ThreadFromGlobalPoolNoTracingContextPropagation>([this] { delayExecutionThreadFunction(); });
}
@ -198,7 +199,7 @@ BackgroundSchedulePool::~BackgroundSchedulePool()
delayed_tasks_cond_var.notify_all();
LOG_TRACE(&Poco::Logger::get("BackgroundSchedulePool/" + thread_name), "Waiting for threads to finish.");
delayed_thread.join();
delayed_thread->join();
for (auto & thread : threads)
thread.join();

View File

@ -14,7 +14,7 @@
#include <Common/ZooKeeper/Types.h>
#include <Common/CurrentMetrics.h>
#include <Common/CurrentThread.h>
#include <Common/ThreadPool.h>
#include <Common/ThreadPool_fwd.h>
#include <base/scope_guard.h>
@ -86,7 +86,7 @@ private:
std::condition_variable delayed_tasks_cond_var;
std::mutex delayed_tasks_mutex;
/// Thread waiting for next delayed task.
ThreadFromGlobalPoolNoTracingContextPropagation delayed_thread;
std::unique_ptr<ThreadFromGlobalPoolNoTracingContextPropagation> delayed_thread;
/// Tasks ordered by scheduled time.
DelayedTasks delayed_tasks;

View File

@ -15,7 +15,6 @@
#include <Poco/Util/Application.h>
#include <Poco/Util/ServerApplication.h>
#include <Poco/Net/SocketAddress.h>
#include <Poco/Version.h>
#include <base/types.h>
#include <Common/logger_useful.h>
#include <base/getThreadId.h>

View File

@ -238,12 +238,15 @@ void SerializationBool::deserializeTextJSON(IColumn &column, ReadBuffer &istr, c
ColumnUInt8 * col = checkAndGetDeserializeColumnType(column);
bool value = false;
if (*istr.position() == 't' || *istr.position() == 'f')
char first_char = *istr.position();
if (first_char == 't' || first_char == 'f')
readBoolTextWord(value, istr);
else if (*istr.position() == '1' || *istr.position() == '0')
else if (first_char == '1' || first_char == '0')
readBoolText(value, istr);
else
throw Exception(ErrorCodes::CANNOT_PARSE_BOOL, "Invalid boolean value, should be true/false, 1/0.");
throw Exception(ErrorCodes::CANNOT_PARSE_BOOL,
"Invalid boolean value, should be true/false, 1/0, but it starts with the '{}' character.", first_char);
col->insert(value);
}

View File

@ -1,6 +1,6 @@
#pragma once
#include <Common/ThreadPool.h>
#include <Interpreters/Context_fwd.h>
#include <Databases/IDatabase.h>
namespace DB

View File

@ -7,7 +7,7 @@
#include <Storages/IStorage_fwd.h>
#include <base/types.h>
#include <Common/Exception.h>
#include <Common/ThreadPool.h>
#include <Common/ThreadPool_fwd.h>
#include <QueryPipeline/BlockIO.h>
#include <ctime>

View File

@ -67,7 +67,6 @@ void registerDictionarySourceMongoDB(DictionarySourceFactory & factory)
#include <Poco/MongoDB/ObjectId.h>
#include <Poco/URI.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Poco/Version.h>
// only after poco
// naming conflict:

View File

@ -254,8 +254,8 @@ public:
virtual NameSet getCacheLayersNames() const
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED,
"Method `getCacheLayersNames()` is not implemented for disk: {}",
getDataSourceDescription().type);
"Method `getCacheLayersNames()` is not implemented for disk: {}",
toString(getDataSourceDescription().type));
}
/// Returns a list of storage objects (contains path, size, ...).
@ -263,7 +263,9 @@ public:
/// be multiple files in remote fs for single clickhouse file.
virtual StoredObjects getStorageObjects(const String &) const
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method `getStorageObjects() not implemented for disk: {}`", getDataSourceDescription().type);
throw Exception(ErrorCodes::NOT_IMPLEMENTED,
"Method `getStorageObjects()` not implemented for disk: {}",
toString(getDataSourceDescription().type));
}
/// For one local path there might be multiple remote paths in case of Log family engines.
@ -281,8 +283,8 @@ public:
virtual void getRemotePathsRecursive(const String &, std::vector<LocalPathWithObjectStoragePaths> &)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED,
"Method `getRemotePathsRecursive() not implemented for disk: {}`",
getDataSourceDescription().type);
"Method `getRemotePathsRecursive() not implemented for disk: {}`",
toString(getDataSourceDescription().type));
}
/// Batch request to remove multiple files.
@ -398,7 +400,7 @@ public:
throw Exception(
ErrorCodes::NOT_IMPLEMENTED,
"Method getObjectStorage() is not implemented for disk type: {}",
getDataSourceDescription().type);
toString(getDataSourceDescription().type));
}
/// Create disk object storage according to disk type.
@ -409,7 +411,7 @@ public:
throw Exception(
ErrorCodes::NOT_IMPLEMENTED,
"Method createDiskObjectStorage() is not implemented for disk type: {}",
getDataSourceDescription().type);
toString(getDataSourceDescription().type));
}
virtual bool supportsStat() const { return false; }

View File

@ -1,15 +1,16 @@
#include "IOUringReader.h"
#include <memory>
#if USE_LIBURING
#include <base/errnoToString.h>
#include <Common/assert_cast.h>
#include <Common/Exception.h>
#include <Common/MemorySanitizer.h>
#include <Common/ProfileEvents.h>
#include <Common/CurrentMetrics.h>
#include <Common/Stopwatch.h>
#include <Common/setThreadName.h>
#include <Common/ThreadPool.h>
#include <Common/logger_useful.h>
#include <future>
@ -44,7 +45,7 @@ namespace ErrorCodes
}
IOUringReader::IOUringReader(uint32_t entries_)
: log(&Poco::Logger::get("IOUringReader"))
: log(&Poco::Logger::get("IOUringReader"))
{
struct io_uring_probe * probe = io_uring_get_probe();
if (!probe)
@ -70,7 +71,7 @@ IOUringReader::IOUringReader(uint32_t entries_)
throwFromErrno("Failed initializing io_uring", ErrorCodes::IO_URING_INIT_FAILED, -ret);
cq_entries = params.cq_entries;
ring_completion_monitor = ThreadFromGlobalPool([this] { monitorRing(); });
ring_completion_monitor = std::make_unique<ThreadFromGlobalPool>([this] { monitorRing(); });
}
std::future<IAsynchronousReader::Result> IOUringReader::submit(Request request)
@ -333,7 +334,7 @@ IOUringReader::~IOUringReader()
io_uring_submit(&ring);
}
ring_completion_monitor.join();
ring_completion_monitor->join();
io_uring_queue_exit(&ring);
}

View File

@ -4,15 +4,20 @@
#if USE_LIBURING
#include <Common/ThreadPool.h>
#include <Common/Exception.h>
#include <Common/ThreadPool_fwd.h>
#include <IO/AsynchronousReader.h>
#include <deque>
#include <unordered_map>
#include <liburing.h>
namespace Poco { class Logger; }
namespace DB
{
class Exception;
/** Perform reads using the io_uring Linux subsystem.
*
* The class sets up a single io_uring that clients submit read requests to, they are
@ -30,7 +35,7 @@ private:
uint32_t cq_entries;
std::atomic<bool> cancelled{false};
ThreadFromGlobalPool ring_completion_monitor;
std::unique_ptr<ThreadFromGlobalPool> ring_completion_monitor;
struct EnqueuedRequest
{
@ -74,7 +79,7 @@ public:
void wait() override {}
virtual ~IOUringReader() override;
~IOUringReader() override;
};
}

View File

@ -8,6 +8,7 @@
#include <Common/setThreadName.h>
#include <Common/MemorySanitizer.h>
#include <Common/CurrentThread.h>
#include <Common/ThreadPool.h>
#include <Poco/Environment.h>
#include <base/errnoToString.h>
#include <Poco/Event.h>
@ -87,7 +88,7 @@ static bool hasBugInPreadV2()
#endif
ThreadPoolReader::ThreadPoolReader(size_t pool_size, size_t queue_size_)
: pool(CurrentMetrics::ThreadPoolFSReaderThreads, CurrentMetrics::ThreadPoolFSReaderThreadsActive, pool_size, pool_size, queue_size_)
: pool(std::make_unique<ThreadPool>(CurrentMetrics::ThreadPoolFSReaderThreads, CurrentMetrics::ThreadPoolFSReaderThreadsActive, pool_size, pool_size, queue_size_))
{
}
@ -200,7 +201,7 @@ std::future<IAsynchronousReader::Result> ThreadPoolReader::submit(Request reques
ProfileEvents::increment(ProfileEvents::ThreadPoolReaderPageCacheMiss);
auto schedule = threadPoolCallbackRunner<Result>(pool, "ThreadPoolRead");
auto schedule = threadPoolCallbackRunner<Result>(*pool, "ThreadPoolRead");
return schedule([request, fd]() -> Result
{
@ -244,4 +245,9 @@ std::future<IAsynchronousReader::Result> ThreadPoolReader::submit(Request reques
}, request.priority);
}
void ThreadPoolReader::wait()
{
pool->wait();
}
}

View File

@ -1,7 +1,8 @@
#pragma once
#include <memory>
#include <IO/AsynchronousReader.h>
#include <Common/ThreadPool.h>
#include <Common/ThreadPool_fwd.h>
#include <Interpreters/threadPoolCallbackRunner.h>
@ -28,14 +29,14 @@ namespace DB
class ThreadPoolReader final : public IAsynchronousReader
{
private:
ThreadPool pool;
std::unique_ptr<ThreadPool> pool;
public:
ThreadPoolReader(size_t pool_size, size_t queue_size_);
std::future<Result> submit(Request request) override;
void wait() override { pool.wait(); }
void wait() override;
/// pool automatically waits for all tasks in destructor.
};

View File

@ -1,6 +1,7 @@
#include "ThreadPoolRemoteFSReader.h"
#include "config.h"
#include <Common/ThreadPool_fwd.h>
#include <Common/Exception.h>
#include <Common/ProfileEvents.h>
#include <Common/CurrentMetrics.h>
@ -14,6 +15,7 @@
#include <base/getThreadId.h>
#include <future>
#include <memory>
namespace ProfileEvents
@ -62,7 +64,7 @@ IAsynchronousReader::Result RemoteFSFileDescriptor::readInto(char * data, size_t
ThreadPoolRemoteFSReader::ThreadPoolRemoteFSReader(size_t pool_size, size_t queue_size_)
: pool(CurrentMetrics::ThreadPoolRemoteFSReaderThreads, CurrentMetrics::ThreadPoolRemoteFSReaderThreadsActive, pool_size, pool_size, queue_size_)
: pool(std::make_unique<ThreadPool>(CurrentMetrics::ThreadPoolRemoteFSReaderThreads, CurrentMetrics::ThreadPoolRemoteFSReaderThreadsActive, pool_size, pool_size, queue_size_))
{
}
@ -92,7 +94,12 @@ std::future<IAsynchronousReader::Result> ThreadPoolRemoteFSReader::submit(Reques
ProfileEvents::increment(ProfileEvents::ThreadpoolReaderReadBytes, result.size);
return Result{ .size = result.size, .offset = result.offset, .execution_watch = std::move(watch) };
}, pool, "VFSRead", request.priority);
}, *pool, "VFSRead", request.priority);
}
void ThreadPoolRemoteFSReader::wait()
{
pool->wait();
}
}

View File

@ -2,7 +2,7 @@
#include <IO/AsynchronousReader.h>
#include <IO/ReadBuffer.h>
#include <Common/ThreadPool.h>
#include <Common/ThreadPool_fwd.h>
#include <Interpreters/threadPoolCallbackRunner.h>
namespace DB
@ -15,10 +15,10 @@ public:
std::future<IAsynchronousReader::Result> submit(Request request) override;
void wait() override { pool.wait(); }
void wait() override;
private:
ThreadPool pool;
std::unique_ptr<ThreadPool> pool;
};
class RemoteFSFileDescriptor : public IAsynchronousReader::IFileDescriptor

View File

@ -16,7 +16,7 @@
#include <Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.h>
#include <Disks/ObjectStorages/StoredObject.h>
#include <Disks/DiskType.h>
#include <Common/ThreadPool.h>
#include <Common/ThreadPool_fwd.h>
#include <Disks/WriteMode.h>

View File

@ -8,7 +8,6 @@
#if USE_AWS_S3
#include <aws/core/client/DefaultRetryStrategy.h>
#include <base/getFQDNOrHostName.h>
#include <Disks/DiskLocal.h>
@ -19,9 +18,7 @@
#include <Disks/ObjectStorages/S3/diskSettings.h>
#include <Disks/ObjectStorages/MetadataStorageFromDisk.h>
#include <Disks/ObjectStorages/MetadataStorageFromPlainObjectStorage.h>
#include <IO/S3Common.h>
#include <Storages/StorageS3Settings.h>
#include <Core/ServerUUID.h>
#include <Common/Macros.h>
@ -87,10 +84,10 @@ public:
private:
static String getServerUUID()
{
DB::UUID server_uuid = DB::ServerUUID::get();
if (server_uuid == DB::UUIDHelpers::Nil)
UUID server_uuid = ServerUUID::get();
if (server_uuid == UUIDHelpers::Nil)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Server UUID is not initialized");
return DB::toString(server_uuid);
return toString(server_uuid);
}
};

View File

@ -2,6 +2,7 @@
#include <memory>
#include <optional>
#include <queue>
#include <Disks/IVolume.h>

View File

@ -15,6 +15,7 @@
#include <Functions/FunctionHelpers.h>
#include <Functions/IFunction.h>
#include <Interpreters/Context.h>
#include <base/scope_guard.h>
#if USE_SSL
#include <openssl/x509v3.h>

View File

@ -494,6 +494,28 @@ struct GccMurmurHashImpl
static constexpr bool use_int_hash_for_pods = false;
};
/// To be compatible with Default Partitioner in Kafka:
/// murmur2: https://github.com/apache/kafka/blob/461c5cfe056db0951d9b74f5adc45973670404d7/clients/src/main/java/org/apache/kafka/common/utils/Utils.java#L480
/// Default Partitioner: https://github.com/apache/kafka/blob/139f7709bd3f5926901a21e55043388728ccca78/clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java#L328
struct KafkaMurmurHashImpl
{
static constexpr auto name = "kafkaMurmurHash";
using ReturnType = UInt32;
static UInt32 apply(const char * data, const size_t size)
{
return MurmurHash2(data, size, 0x9747b28cU) & 0x7fffffff;
}
static UInt32 combineHashes(UInt32 h1, UInt32 h2)
{
return IntHash32Impl::apply(h1) ^ h2;
}
static constexpr bool use_int_hash_for_pods = false;
};
struct MurmurHash3Impl32
{
static constexpr auto name = "murmurHash3_32";
@ -1727,6 +1749,7 @@ using FunctionMetroHash64 = FunctionAnyHash<ImplMetroHash64>;
using FunctionMurmurHash2_32 = FunctionAnyHash<MurmurHash2Impl32>;
using FunctionMurmurHash2_64 = FunctionAnyHash<MurmurHash2Impl64>;
using FunctionGccMurmurHash = FunctionAnyHash<GccMurmurHashImpl>;
using FunctionKafkaMurmurHash = FunctionAnyHash<KafkaMurmurHashImpl>;
using FunctionMurmurHash3_32 = FunctionAnyHash<MurmurHash3Impl32>;
using FunctionMurmurHash3_64 = FunctionAnyHash<MurmurHash3Impl64>;
using FunctionMurmurHash3_128 = FunctionAnyHash<MurmurHash3Impl128>;

View File

@ -17,5 +17,6 @@ REGISTER_FUNCTION(HashingMurmur)
factory.registerFunction<FunctionMurmurHash3_64>();
factory.registerFunction<FunctionMurmurHash3_128>();
factory.registerFunction<FunctionGccMurmurHash>();
factory.registerFunction<FunctionKafkaMurmurHash>();
}
}

View File

@ -70,16 +70,16 @@ namespace ErrorCodes
namespace
{
inline bool startsWith(const char * s, const char * end, const char * prefix)
inline bool startsWith(const char * s, const char * end, const std::string_view prefix)
{
return s + strlen(prefix) < end && 0 == memcmp(s, prefix, strlen(prefix));
return s + prefix.length() < end && 0 == memcmp(s, prefix.data(), prefix.length());
}
inline bool checkAndSkip(const char * __restrict & s, const char * end, const char * prefix)
inline bool checkAndSkip(const char * __restrict & s, const char * end, const std::string_view prefix)
{
if (startsWith(s, end, prefix))
{
s += strlen(prefix);
s += prefix.length();
return true;
}
return false;
@ -138,7 +138,7 @@ bool processCDATA(const char * __restrict & src, const char * end, char * __rest
return true;
}
bool processElementAndSkipContent(const char * __restrict & src, const char * end, const char * tag_name)
bool processElementAndSkipContent(const char * __restrict & src, const char * end, const std::string_view tag_name)
{
const auto * old_src = src;

View File

@ -1,5 +1,6 @@
#include <IO/BackupsIOThreadPool.h>
#include <Common/CurrentMetrics.h>
#include <Common/ThreadPool.h>
#include <Core/Field.h>
namespace CurrentMetrics

View File

@ -1,6 +1,8 @@
#pragma once
#include <Common/ThreadPool.h>
#include <Common/ThreadPool_fwd.h>
#include <cstdlib>
#include <memory>
namespace DB
{

View File

@ -7,8 +7,6 @@
#include <Common/ProfileEvents.h>
#include <Common/SipHash.h>
#include <Poco/Version.h>
#include "config.h"
#if USE_SSL

View File

@ -1,5 +1,6 @@
#include <IO/IOThreadPool.h>
#include <Common/CurrentMetrics.h>
#include <Common/ThreadPool.h>
#include <Core/Field.h>
namespace CurrentMetrics

View File

@ -1,6 +1,8 @@
#pragma once
#include <Common/ThreadPool.h>
#include <Common/ThreadPool_fwd.h>
#include <cstdlib>
#include <memory>
namespace DB
{

View File

@ -5,7 +5,6 @@
#include <IO/SeekableReadBuffer.h>
#include <Interpreters/threadPoolCallbackRunner.h>
#include <Common/ArenaWithFreeLists.h>
#include <Common/ThreadPool.h>
namespace DB
{

View File

@ -1,4 +1,5 @@
#include <IO/ReadBufferFromFileBase.h>
#include <IO/Progress.h>
#include <Interpreters/Context.h>
namespace DB

View File

@ -21,7 +21,6 @@
#include <Poco/Net/HTTPResponse.h>
#include <Poco/URI.h>
#include <Poco/URIStreamFactory.h>
#include <Poco/Version.h>
#include <Common/DNSResolver.h>
#include <Common/RemoteHostFilter.h>
#include "config.h"

View File

@ -10,7 +10,6 @@
#include <base/types.h>
#include <Common/logger_useful.h>
#include <Common/ThreadPool.h>
#include <IO/BufferWithOwnMemory.h>
#include <IO/WriteBuffer.h>
#include <IO/WriteSettings.h>

View File

@ -23,11 +23,11 @@ ZstdDeflatingAppendableWriteBuffer::ZstdDeflatingAppendableWriteBuffer(
{
cctx = ZSTD_createCCtx();
if (cctx == nullptr)
throw Exception(ErrorCodes::ZSTD_ENCODER_FAILED, "zstd stream encoder init failed: zstd version: {}", ZSTD_VERSION_STRING);
throw Exception(ErrorCodes::ZSTD_ENCODER_FAILED, "ZSTD stream encoder init failed: ZSTD version: {}", ZSTD_VERSION_STRING);
size_t ret = ZSTD_CCtx_setParameter(cctx, ZSTD_c_compressionLevel, compression_level);
if (ZSTD_isError(ret))
throw Exception(ErrorCodes::ZSTD_ENCODER_FAILED,
"zstd stream encoder option setting failed: error code: {}; zstd version: {}",
"ZSTD stream encoder option setting failed: error code: {}; zstd version: {}",
ret, ZSTD_VERSION_STRING);
input = {nullptr, 0, 0};
@ -64,7 +64,7 @@ void ZstdDeflatingAppendableWriteBuffer::nextImpl()
if (ZSTD_isError(compression_result))
throw Exception(
ErrorCodes::ZSTD_ENCODER_FAILED,
"Zstd stream encoding failed: error code: {}; zstd version: {}",
"ZSTD stream decoding failed: error code: {}; ZSTD version: {}",
ZSTD_getErrorName(compression_result), ZSTD_VERSION_STRING);
first_write = false;
@ -138,7 +138,7 @@ void ZstdDeflatingAppendableWriteBuffer::finalizeBefore()
{
if (ZSTD_isError(remaining))
throw Exception(ErrorCodes::ZSTD_ENCODER_FAILED,
"Zstd stream encoder end failed: error: '{}' zstd version: {}",
"ZSTD stream encoder end failed: error: '{}' ZSTD version: {}",
ZSTD_getErrorName(remaining), ZSTD_VERSION_STRING);
remaining = ZSTD_compressStream2(cctx, &output, &input, ZSTD_e_end);

View File

@ -63,7 +63,7 @@ void ZstdDeflatingWriteBuffer::nextImpl()
if (ZSTD_isError(compression_result))
throw Exception(
ErrorCodes::ZSTD_ENCODER_FAILED,
"Zstd stream encoding failed: error: '{}'; zstd version: {}",
"ZSTD stream encoding failed: error: '{}'; zstd version: {}",
ZSTD_getErrorName(compression_result), ZSTD_VERSION_STRING);
out->position() = out->buffer().begin() + output.pos;

View File

@ -1,4 +1,5 @@
#include <IO/ZstdInflatingReadBuffer.h>
#include <zstd_errors.h>
namespace DB
@ -56,11 +57,17 @@ bool ZstdInflatingReadBuffer::nextImpl()
/// Decompress data and check errors.
size_t ret = ZSTD_decompressStream(dctx, &output, &input);
if (ZSTD_isError(ret))
if (ZSTD_getErrorCode(ret))
{
throw Exception(
ErrorCodes::ZSTD_DECODER_FAILED,
"Zstd stream encoding failed: error '{}'; zstd version: {}",
ZSTD_getErrorName(ret), ZSTD_VERSION_STRING);
ErrorCodes::ZSTD_DECODER_FAILED,
"ZSTD stream decoding failed: error '{}'{}; ZSTD version: {}",
ZSTD_getErrorName(ret),
ZSTD_error_frameParameter_windowTooLarge == ret
? ". You can increase the maximum window size with the 'zstd_window_log_max' setting in ClickHouse. Example: 'SET zstd_window_log_max = 31'"
: "",
ZSTD_VERSION_STRING);
}
/// Check that something has changed after decompress (input or output position)
assert(in->eof() || output.pos > 0 || in->position() < in->buffer().begin() + input.pos);

View File

@ -242,8 +242,9 @@ void QueryCache::Writer::finalizeWrite()
Chunks squashed_chunks;
size_t rows_remaining_in_squashed = 0; /// how many further rows can the last squashed chunk consume until it reaches max_block_size
for (const auto & chunk : *query_result)
for (auto & chunk : *query_result)
{
convertToFullIfSparse(chunk);
const size_t rows_chunk = chunk.getNumRows();
size_t rows_chunk_processed = 0;

View File

@ -5,7 +5,7 @@
#include <Common/MultiVersion.h>
#include <Common/OpenTelemetryTraceContext.h>
#include <Common/RemoteHostFilter.h>
#include <Common/ThreadPool.h>
#include <Common/ThreadPool_fwd.h>
#include <Common/Throttler_fwd.h>
#include <Core/Block.h>
#include <Core/NamesAndTypes.h>

View File

@ -3,6 +3,7 @@
#include <Interpreters/SystemLog.h>
#include <Core/NamesAndTypes.h>
#include <Core/NamesAndAliases.h>
#include <Core/Field.h>
/// Call this function on crash.

View File

@ -31,9 +31,11 @@
#include <base/getFQDNOrHostName.h>
#include <Common/logger_useful.h>
#include <base/sort.h>
#include <memory>
#include <random>
#include <pcg_random.hpp>
#include <Common/scope_guard_safe.h>
#include <Common/ThreadPool.h>
#include <Interpreters/ZooKeeperLog.h>
@ -121,8 +123,8 @@ void DDLWorker::startup()
{
[[maybe_unused]] bool prev_stop_flag = stop_flag.exchange(false);
chassert(prev_stop_flag);
main_thread = ThreadFromGlobalPool(&DDLWorker::runMainThread, this);
cleanup_thread = ThreadFromGlobalPool(&DDLWorker::runCleanupThread, this);
main_thread = std::make_unique<ThreadFromGlobalPool>(&DDLWorker::runMainThread, this);
cleanup_thread = std::make_unique<ThreadFromGlobalPool>(&DDLWorker::runCleanupThread, this);
}
void DDLWorker::shutdown()
@ -132,8 +134,10 @@ void DDLWorker::shutdown()
{
queue_updated_event->set();
cleanup_event->set();
main_thread.join();
cleanup_thread.join();
if (main_thread)
main_thread->join();
if (cleanup_thread)
cleanup_thread->join();
worker_pool.reset();
}
}

View File

@ -3,7 +3,7 @@
#include <Common/CurrentThread.h>
#include <Common/CurrentMetrics.h>
#include <Common/DNSResolver.h>
#include <Common/ThreadPool.h>
#include <Common/ThreadPool_fwd.h>
#include <Common/ZooKeeper/IKeeper.h>
#include <Storages/IStorage_fwd.h>
#include <Parsers/IAST_fwd.h>
@ -145,8 +145,8 @@ protected:
std::atomic<bool> initialized = false;
std::atomic<bool> stop_flag = true;
ThreadFromGlobalPool main_thread;
ThreadFromGlobalPool cleanup_thread;
std::unique_ptr<ThreadFromGlobalPool> main_thread;
std::unique_ptr<ThreadFromGlobalPool> cleanup_thread;
/// Size of the pool for query execution.
size_t pool_size = 1;

View File

@ -8,16 +8,17 @@
#include <Databases/DatabaseMemory.h>
#include <Databases/DatabaseOnDisk.h>
#include <Disks/IDisk.h>
#include <Common/quoteString.h>
#include <Storages/StorageMemory.h>
#include <Core/BackgroundSchedulePool.h>
#include <Parsers/formatAST.h>
#include <IO/ReadHelpers.h>
#include <Poco/DirectoryIterator.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Common/quoteString.h>
#include <Common/atomicRename.h>
#include <Common/CurrentMetrics.h>
#include <Common/logger_useful.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Common/ThreadPool.h>
#include <Common/filesystemHelpers.h>
#include <Common/noexcept_scope.h>
#include <Common/checkStackSize.h>

View File

@ -1,4 +1,5 @@
#include <Interpreters/MetricLog.h>
#include <Common/ThreadPool.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime.h>
@ -58,7 +59,7 @@ void MetricLog::startCollectMetric(size_t collect_interval_milliseconds_)
{
collect_interval_milliseconds = collect_interval_milliseconds_;
is_shutdown_metric_thread = false;
metric_flush_thread = ThreadFromGlobalPool([this] { metricThreadFunction(); });
metric_flush_thread = std::make_unique<ThreadFromGlobalPool>([this] { metricThreadFunction(); });
}
@ -67,7 +68,8 @@ void MetricLog::stopCollectMetric()
bool old_val = false;
if (!is_shutdown_metric_thread.compare_exchange_strong(old_val, true))
return;
metric_flush_thread.join();
if (metric_flush_thread)
metric_flush_thread->join();
}

View File

@ -3,6 +3,7 @@
#include <Interpreters/SystemLog.h>
#include <Common/ProfileEvents.h>
#include <Common/CurrentMetrics.h>
#include <Common/ThreadPool_fwd.h>
#include <Core/NamesAndTypes.h>
#include <Core/NamesAndAliases.h>
@ -50,7 +51,7 @@ public:
private:
void metricThreadFunction();
ThreadFromGlobalPool metric_flush_thread;
std::unique_ptr<ThreadFromGlobalPool> metric_flush_thread;
size_t collect_interval_milliseconds;
std::atomic<bool> is_shutdown_metric_thread{false};
};

View File

@ -1,6 +1,7 @@
#pragma once
#include <Interpreters/SystemLog.h>
#include <Common/OpenTelemetryTraceContext.h>
#include <Core/NamesAndTypes.h>
#include <Core/NamesAndAliases.h>

View File

@ -2,16 +2,11 @@
#include <Interpreters/SystemLog.h>
#include <Interpreters/ClientInfo.h>
#include <Common/ProfileEvents.h>
#include <Core/NamesAndTypes.h>
#include <Core/NamesAndAliases.h>
namespace ProfileEvents
{
class Counters;
}
namespace DB
{

View File

@ -15,7 +15,6 @@
#include <Common/quoteString.h>
#include <base/range.h>
#include <Poco/URI.h>
#include <Poco/Version.h>
// only after poco
// naming conflict:

View File

@ -22,6 +22,7 @@
#include <Common/setThreadName.h>
#include <Core/MySQL/Authentication.h>
#include <Common/logger_useful.h>
#include <base/scope_guard.h>
#include "config_version.h"

View File

@ -275,7 +275,7 @@ public:
/// acquiring the lock instead of raising a TABLE_IS_DROPPED exception
TableLockHolder tryLockForShare(const String & query_id, const std::chrono::milliseconds & acquire_timeout);
/// Lock table for alter. This lock must be acuqired in ALTER queries to be
/// Lock table for alter. This lock must be acquired in ALTER queries to be
/// sure, that we execute only one simultaneous alter. Doesn't affect share lock.
using AlterLockHolder = std::unique_lock<std::timed_mutex>;
AlterLockHolder lockForAlter(const std::chrono::milliseconds & acquire_timeout);

View File

@ -1,7 +1,6 @@
#pragma once
#include <Storages/MergeTree/MergeTreeBackgroundExecutor.h>
#include <Common/ThreadPool.h>
#include <Core/BackgroundSchedulePool.h>
#include <pcg_random.hpp>

View File

@ -34,6 +34,8 @@
#include <Interpreters/MergeTreeTransaction.h>
#include <Interpreters/TransactionLog.h>
#include <Disks/IO/CachedOnDiskReadBufferFromFile.h>
namespace CurrentMetrics
{
@ -1525,6 +1527,10 @@ bool IMergeTreeDataPart::assertHasValidVersionMetadata() const
size_t file_size = getDataPartStorage().getFileSize(TXN_VERSION_METADATA_FILE_NAME);
auto buf = getDataPartStorage().readFile(TXN_VERSION_METADATA_FILE_NAME, ReadSettings().adjustBufferSize(file_size), file_size, std::nullopt);
/// FIXME https://github.com/ClickHouse/ClickHouse/issues/48465
if (dynamic_cast<CachedOnDiskReadBufferFromFile *>(buf.get()))
return true;
readStringUntilEOF(content, *buf);
ReadBufferFromString str_buf{content};
VersionMetadata file;

View File

@ -2,21 +2,77 @@
#include <algorithm>
#include <Common/ThreadPool.h>
#include <Common/setThreadName.h>
#include <Common/Exception.h>
#include <Storages/MergeTree/BackgroundJobsAssignee.h>
#include <Common/noexcept_scope.h>
namespace CurrentMetrics
{
extern const Metric MergeTreeBackgroundExecutorThreads;
extern const Metric MergeTreeBackgroundExecutorThreadsActive;
}
namespace DB
{
namespace ErrorCodes
{
extern const int ABORTED;
extern const int INVALID_CONFIG_PARAMETER;
}
template <class Queue>
MergeTreeBackgroundExecutor<Queue>::MergeTreeBackgroundExecutor(
String name_,
size_t threads_count_,
size_t max_tasks_count_,
CurrentMetrics::Metric metric_,
CurrentMetrics::Metric max_tasks_metric_)
: name(name_)
, threads_count(threads_count_)
, max_tasks_count(max_tasks_count_)
, metric(metric_)
, max_tasks_metric(max_tasks_metric_, 2 * max_tasks_count) // active + pending
, pool(std::make_unique<ThreadPool>(CurrentMetrics::MergeTreeBackgroundExecutorThreads, CurrentMetrics::MergeTreeBackgroundExecutorThreadsActive))
{
if (max_tasks_count == 0)
throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER, "Task count for MergeTreeBackgroundExecutor must not be zero");
pending.setCapacity(max_tasks_count);
active.set_capacity(max_tasks_count);
pool->setMaxThreads(std::max(1UL, threads_count));
pool->setMaxFreeThreads(std::max(1UL, threads_count));
pool->setQueueSize(std::max(1UL, threads_count));
for (size_t number = 0; number < threads_count; ++number)
pool->scheduleOrThrowOnError([this] { threadFunction(); });
}
template <class Queue>
MergeTreeBackgroundExecutor<Queue>::MergeTreeBackgroundExecutor(
String name_,
size_t threads_count_,
size_t max_tasks_count_,
CurrentMetrics::Metric metric_,
CurrentMetrics::Metric max_tasks_metric_,
std::string_view policy)
requires requires(Queue queue) { queue.updatePolicy(policy); } // Because we use explicit template instantiation
: MergeTreeBackgroundExecutor(name_, threads_count_, max_tasks_count_, metric_, max_tasks_metric_)
{
pending.updatePolicy(policy);
}
template <class Queue>
MergeTreeBackgroundExecutor<Queue>::~MergeTreeBackgroundExecutor()
{
wait();
}
template <class Queue>
void MergeTreeBackgroundExecutor<Queue>::wait()
{
@ -26,7 +82,7 @@ void MergeTreeBackgroundExecutor<Queue>::wait()
has_tasks.notify_all();
}
pool.wait();
pool->wait();
}
template <class Queue>
@ -52,12 +108,12 @@ void MergeTreeBackgroundExecutor<Queue>::increaseThreadsAndMaxTasksCount(size_t
pending.setCapacity(new_max_tasks_count);
active.set_capacity(new_max_tasks_count);
pool.setMaxThreads(std::max(1UL, new_threads_count));
pool.setMaxFreeThreads(std::max(1UL, new_threads_count));
pool.setQueueSize(std::max(1UL, new_threads_count));
pool->setMaxThreads(std::max(1UL, new_threads_count));
pool->setMaxFreeThreads(std::max(1UL, new_threads_count));
pool->setQueueSize(std::max(1UL, new_threads_count));
for (size_t number = threads_count; number < new_threads_count; ++number)
pool.scheduleOrThrowOnError([this] { threadFunction(); });
pool->scheduleOrThrowOnError([this] { threadFunction(); });
max_tasks_metric.changeTo(2 * new_max_tasks_count); // pending + active
max_tasks_count.store(new_max_tasks_count, std::memory_order_relaxed);
@ -161,13 +217,10 @@ void MergeTreeBackgroundExecutor<Queue>::routine(TaskRuntimeDataPtr item)
if (item->is_currently_deleting)
{
/// This is significant to order the destructors.
{
NOEXCEPT_SCOPE({
ALLOW_ALLOCATIONS_IN_SCOPE;
item->task.reset();
});
}
NOEXCEPT_SCOPE({
ALLOW_ALLOCATIONS_IN_SCOPE;
item->task.reset();
});
item->is_done.set();
item = nullptr;
return;

View File

@ -15,24 +15,14 @@
#include <Common/CurrentMetrics.h>
#include <Common/logger_useful.h>
#include <Common/ThreadPool.h>
#include <Common/ThreadPool_fwd.h>
#include <Common/Stopwatch.h>
#include <base/defines.h>
#include <Storages/MergeTree/IExecutableTask.h>
namespace CurrentMetrics
{
extern const Metric MergeTreeBackgroundExecutorThreads;
extern const Metric MergeTreeBackgroundExecutorThreadsActive;
}
namespace DB
{
namespace ErrorCodes
{
extern const int INVALID_CONFIG_PARAMETER;
}
struct TaskRuntimeData;
using TaskRuntimeDataPtr = std::shared_ptr<TaskRuntimeData>;
@ -255,28 +245,7 @@ public:
size_t threads_count_,
size_t max_tasks_count_,
CurrentMetrics::Metric metric_,
CurrentMetrics::Metric max_tasks_metric_)
: name(name_)
, threads_count(threads_count_)
, max_tasks_count(max_tasks_count_)
, metric(metric_)
, max_tasks_metric(max_tasks_metric_, 2 * max_tasks_count) // active + pending
, pool(CurrentMetrics::MergeTreeBackgroundExecutorThreads, CurrentMetrics::MergeTreeBackgroundExecutorThreadsActive)
{
if (max_tasks_count == 0)
throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER, "Task count for MergeTreeBackgroundExecutor must not be zero");
pending.setCapacity(max_tasks_count);
active.set_capacity(max_tasks_count);
pool.setMaxThreads(std::max(1UL, threads_count));
pool.setMaxFreeThreads(std::max(1UL, threads_count));
pool.setQueueSize(std::max(1UL, threads_count));
for (size_t number = 0; number < threads_count; ++number)
pool.scheduleOrThrowOnError([this] { threadFunction(); });
}
CurrentMetrics::Metric max_tasks_metric_);
MergeTreeBackgroundExecutor(
String name_,
size_t threads_count_,
@ -284,16 +253,8 @@ public:
CurrentMetrics::Metric metric_,
CurrentMetrics::Metric max_tasks_metric_,
std::string_view policy)
requires requires(Queue queue) { queue.updatePolicy(policy); } // Because we use explicit template instantiation
: MergeTreeBackgroundExecutor(name_, threads_count_, max_tasks_count_, metric_, max_tasks_metric_)
{
pending.updatePolicy(policy);
}
~MergeTreeBackgroundExecutor()
{
wait();
}
requires requires(Queue queue) { queue.updatePolicy(policy); }; // Because we use explicit template instantiation
~MergeTreeBackgroundExecutor();
/// Handler for hot-reloading
/// Supports only increasing the number of threads and tasks, because
@ -335,7 +296,7 @@ private:
mutable std::mutex mutex;
std::condition_variable has_tasks TSA_GUARDED_BY(mutex);
bool shutdown TSA_GUARDED_BY(mutex) = false;
ThreadPool pool;
std::unique_ptr<ThreadPool> pool;
Poco::Logger * log = &Poco::Logger::get("MergeTreeBackgroundExecutor");
};

View File

@ -6060,51 +6060,48 @@ bool MergeTreeData::isPrimaryOrMinMaxKeyColumnPossiblyWrappedInFunctions(
}
bool MergeTreeData::mayBenefitFromIndexForIn(
const ASTPtr & left_in_operand, ContextPtr, const StorageMetadataPtr & metadata_snapshot) const
const ASTPtr & left_in_operand, ContextPtr query_context, const StorageMetadataPtr & metadata_snapshot) const
{
/// Make sure that the left side of the IN operator contain part of the key.
/// If there is a tuple on the left side of the IN operator, at least one item of the tuple
/// must be part of the key (probably wrapped by a chain of some acceptable functions).
/// must be part of the key (probably wrapped by a chain of some acceptable functions).
const auto * left_in_operand_tuple = left_in_operand->as<ASTFunction>();
const auto & index_wrapper_factory = MergeTreeIndexFactory::instance();
const auto & index_factory = MergeTreeIndexFactory::instance();
const auto & query_settings = query_context->getSettingsRef();
auto check_for_one_argument = [&](const auto & ast)
{
if (isPrimaryOrMinMaxKeyColumnPossiblyWrappedInFunctions(ast, metadata_snapshot))
return true;
if (query_settings.use_skip_indexes)
{
for (const auto & index : metadata_snapshot->getSecondaryIndices())
if (index_factory.get(index)->mayBenefitFromIndexForIn(ast))
return true;
}
if (query_settings.allow_experimental_projection_optimization)
{
for (const auto & projection : metadata_snapshot->getProjections())
if (projection.isPrimaryKeyColumnPossiblyWrappedInFunctions(ast))
return true;
}
return false;
};
if (left_in_operand_tuple && left_in_operand_tuple->name == "tuple")
{
for (const auto & item : left_in_operand_tuple->arguments->children)
{
if (isPrimaryOrMinMaxKeyColumnPossiblyWrappedInFunctions(item, metadata_snapshot))
return true;
for (const auto & index : metadata_snapshot->getSecondaryIndices())
if (index_wrapper_factory.get(index)->mayBenefitFromIndexForIn(item))
return true;
for (const auto & projection : metadata_snapshot->getProjections())
{
if (projection.isPrimaryKeyColumnPossiblyWrappedInFunctions(item))
return true;
}
}
/// The tuple itself may be part of the primary key, so check that as a last resort.
if (isPrimaryOrMinMaxKeyColumnPossiblyWrappedInFunctions(left_in_operand, metadata_snapshot))
return true;
for (const auto & projection : metadata_snapshot->getProjections())
{
if (projection.isPrimaryKeyColumnPossiblyWrappedInFunctions(left_in_operand))
return true;
}
return false;
}
else
{
for (const auto & index : metadata_snapshot->getSecondaryIndices())
if (index_wrapper_factory.get(index)->mayBenefitFromIndexForIn(left_in_operand))
if (check_for_one_argument(item))
return true;
for (const auto & projection : metadata_snapshot->getProjections())
{
if (projection.isPrimaryKeyColumnPossiblyWrappedInFunctions(left_in_operand))
return true;
}
return isPrimaryOrMinMaxKeyColumnPossiblyWrappedInFunctions(left_in_operand, metadata_snapshot);
/// The tuple itself may be part of the primary key
/// or skip index, so check that as a last resort.
}
return check_for_one_argument(left_in_operand);
}
using PartitionIdToMaxBlock = std::unordered_map<String, Int64>;

View File

@ -3,7 +3,7 @@
#include <Storages/MergeTree/IDataPartStorage.h>
#include <Storages/MarkCache.h>
#include <IO/ReadSettings.h>
#include <Common/ThreadPool.h>
#include <Common/ThreadPool_fwd.h>
namespace DB

View File

@ -6,6 +6,7 @@
#include <Storages/MergeTree/LoadedMergeTreeDataPartInfoForReader.h>
#include <Storages/MergeTree/IMergeTreeReader.h>
#include <Interpreters/threadPoolCallbackRunner.h>
#include <Interpreters/Context.h>
#include <Common/ElapsedTimeProfileEventIncrement.h>
#include <IO/Operators.h>
#include <base/getThreadId.h>

View File

@ -1,10 +1,11 @@
#pragma once
#include <Common/ThreadPool.h>
#include <Common/ThreadPool_fwd.h>
#include <Interpreters/ExpressionActionsSettings.h>
#include <Storages/MergeTree/MergeTreeReadPool.h>
#include <Storages/MergeTree/MergeTreeIOSettings.h>
#include <IO/AsyncReadCounters.h>
#include <boost/heap/priority_queue.hpp>
#include <queue>
namespace DB

View File

@ -1147,7 +1147,8 @@ void ReplicatedMergeTreeQueue::removePartProducingOpsInRange(
*it, /* is_successful = */ false,
min_unprocessed_insert_time_changed, max_processed_insert_time_changed, lock);
(*it)->removed_by_other_entry = true;
LogEntryPtr removing_entry = std::move(*it); /// Make it live a bit longer
removing_entry->removed_by_other_entry = true;
it = queue.erase(it);
notifySubscribers(queue.size(), &znode_name);
++removed_entries;
@ -2491,6 +2492,7 @@ ReplicatedMergeTreeQueue::addSubscriber(ReplicatedMergeTreeQueue::SubscriberCall
|| std::find(lightweight_entries.begin(), lightweight_entries.end(), entry->type) != lightweight_entries.end())
out_entry_names.insert(entry->znode_name);
}
LOG_TEST(log, "Waiting for {} entries to be processed: {}", out_entry_names.size(), fmt::join(out_entry_names, ", "));
}
auto it = subscribers.emplace(subscribers.end(), std::move(callback));

View File

@ -4,6 +4,7 @@
#include <Storages/IStorage.h>
#include <Interpreters/IExternalLoaderConfigRepository.h>
#include <base/scope_guard.h>
namespace DB

View File

@ -7,7 +7,6 @@
#include <Poco/MongoDB/Connection.h>
#include <Poco/MongoDB/Cursor.h>
#include <Poco/MongoDB/Database.h>
#include <Poco/Version.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Core/Settings.h>
#include <Interpreters/Context.h>

View File

@ -2209,35 +2209,43 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
/// Check that we could cover whole range
for (PartDescriptionPtr & part_desc : parts_to_add)
{
if (adding_parts_active_set.getContainingPart(part_desc->new_part_info).empty())
if (!adding_parts_active_set.getContainingPart(part_desc->new_part_info).empty())
continue;
MergeTreePartInfo covering_drop_range;
if (queue.isGoingToBeDropped(part_desc->new_part_info, &covering_drop_range))
{
/// We should enqueue missing part for check, so it will be replaced with empty one (if needed)
/// and we will be able to execute this REPLACE_RANGE.
/// However, it's quite dangerous, because part may appear in source table.
/// So we enqueue it for check only if no replicas of source table have part either.
bool need_check = true;
if (auto * replicated_src_table = typeid_cast<StorageReplicatedMergeTree *>(source_table.get()))
{
String src_replica = replicated_src_table->findReplicaHavingPart(part_desc->src_part_name, false);
if (!src_replica.empty())
{
LOG_DEBUG(log, "Found part {} on replica {} of source table, will not check part {} required for {}",
part_desc->src_part_name, src_replica, part_desc->new_part_name, entry.znode_name);
need_check = false;
}
}
if (need_check)
{
LOG_DEBUG(log, "Will check part {} required for {}, because no replicas have it (including replicas of source table)",
part_desc->new_part_name, entry.znode_name);
enqueuePartForCheck(part_desc->new_part_name);
}
throw Exception(ErrorCodes::NO_REPLICA_HAS_PART,
"Not found part {} (or part covering it) neither source table neither remote replicas",
part_desc->new_part_name);
LOG_WARNING(log, "Will not add part {} (while replacing {}) because it's going to be dropped (DROP_RANGE: {})",
part_desc->new_part_name, entry_replace.drop_range_part_name, covering_drop_range.getPartNameForLogs());
continue;
}
/// We should enqueue missing part for check, so it will be replaced with empty one (if needed)
/// and we will be able to execute this REPLACE_RANGE.
/// However, it's quite dangerous, because part may appear in source table.
/// So we enqueue it for check only if no replicas of source table have part either.
bool need_check = true;
if (auto * replicated_src_table = typeid_cast<StorageReplicatedMergeTree *>(source_table.get()))
{
String src_replica = replicated_src_table->findReplicaHavingPart(part_desc->src_part_name, false);
if (!src_replica.empty())
{
LOG_DEBUG(log, "Found part {} on replica {} of source table, will not check part {} required for {}",
part_desc->src_part_name, src_replica, part_desc->new_part_name, entry.znode_name);
need_check = false;
}
}
if (need_check)
{
LOG_DEBUG(log, "Will check part {} required for {}, because no replicas have it (including replicas of source table)",
part_desc->new_part_name, entry.znode_name);
enqueuePartForCheck(part_desc->new_part_name);
}
throw Exception(ErrorCodes::NO_REPLICA_HAS_PART,
"Not found part {} (or part covering it) neither source table neither remote replicas",
part_desc->new_part_name);
}
/// Filter covered parts
@ -7616,7 +7624,6 @@ bool StorageReplicatedMergeTree::waitForProcessingQueue(UInt64 max_wait_millisec
if (removed_log_entry_id)
wait_for_ids.erase(*removed_log_entry_id);
chassert(new_queue_size || wait_for_ids.empty());
if (wait_for_ids.empty())
target_entry_event.set();
};

View File

@ -6,7 +6,8 @@
#include <re2/re2.h>
#include <boost/algorithm/string.hpp>
#include <filesystem>
#include "Poco/File.h"
#include <base/scope_guard.h>
#include <Poco/File.h>
#if USE_SSL
#include <openssl/x509v3.h>
#include "Poco/Net/SSLManager.h"

View File

@ -59,6 +59,8 @@ def test_create_insert(started_cluster):
node2.query("INSERT INTO tbl VALUES (1, 'str1')") # Test deduplication
node3.query("INSERT INTO tbl VALUES (2, 'str2')")
node1.query("SYSTEM SYNC REPLICA ON CLUSTER 'test_cluster' tbl")
for node in [node1, node2, node3]:
expected = [[1, "str1"], [2, "str2"]]
assert node.query("SELECT * FROM tbl ORDER BY id") == TSV(expected)

View File

@ -0,0 +1,17 @@
<test>
<create_query>
CREATE TABLE test_in_skip_idx
(
a UInt64,
s String,
INDEX idx s TYPE bloom_filter GRANULARITY 1
)
ENGINE = MergeTree() ORDER BY a
</create_query>
<fill_query>INSERT INTO test_in_skip_idx SELECT number, number FROM numbers(10000000)</fill_query>
<fill_query>OPTIMIZE TABLE test_in_skip_idx FINAL</fill_query>
<query>SELECT count() FROM test_in_skip_idx WHERE s IN (SELECT toString(number + 10000000) FROM numbers(100000)) SETTINGS use_skip_indexes = 0</query>
<drop_query>DROP TABLE IF EXISTS test_in_skip_idx</drop_query>
</test>

View File

@ -12,7 +12,7 @@ DETACH TABLE rmt1;
ATTACH TABLE rmt1;
SHOW CREATE TABLE rmt1;
CREATE TABLE rmt (n UInt64, s String) ENGINE = ReplicatedMergeTree('{default_path_test}{uuid}', '{default_name_test}') ORDER BY n; -- { serverError 62 }
CREATE TABLE rmt (n UInt64, s String) ENGINE = ReplicatedMergeTree('{default_path_test}{uuid}', '{default_name_test}') ORDER BY n; -- { serverError 36 }
CREATE TABLE rmt (n UInt64, s String) ENGINE = ReplicatedMergeTree('{default_path_test}test_01148', '{default_name_test}') ORDER BY n;
SHOW CREATE TABLE rmt;
RENAME TABLE rmt TO rmt2; -- { serverError 48 }
@ -24,7 +24,7 @@ SET distributed_ddl_output_mode='none';
DROP DATABASE IF EXISTS test_01148_atomic;
CREATE DATABASE test_01148_atomic ENGINE=Atomic;
CREATE TABLE test_01148_atomic.rmt2 ON CLUSTER test_shard_localhost (n int, PRIMARY KEY n) ENGINE=ReplicatedMergeTree;
CREATE TABLE test_01148_atomic.rmt3 AS test_01148_atomic.rmt2; -- { serverError 62 }
CREATE TABLE test_01148_atomic.rmt3 AS test_01148_atomic.rmt2; -- { serverError 36 }
CREATE TABLE test_01148_atomic.rmt4 ON CLUSTER test_shard_localhost AS test_01148_atomic.rmt2;
SHOW CREATE TABLE test_01148_atomic.rmt2;
RENAME TABLE test_01148_atomic.rmt4 to test_01148_atomic.rmt3;

View File

@ -1,11 +1,5 @@
#!/usr/bin/env bash
# Tags: no-tsan, no-asan, no-ubsan, no-msan, no-parallel, no-fasttest
# Tag no-tsan: requires jemalloc to track small allocations
# Tag no-asan: requires jemalloc to track small allocations
# Tag no-ubsan: requires jemalloc to track small allocations
# Tag no-msan: requires jemalloc to track small allocations
# Tags: no-parallel, no-fasttest
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh

View File

@ -389,6 +389,7 @@ javaHashUTF16LE
joinGet
joinGetOrNull
jumpConsistentHash
kafkaMurmurHash
kostikConsistentHash
lcm
least

View File

@ -0,0 +1,5 @@
1173551340
1357151166
1161502112
661178819
2088585677

View File

@ -0,0 +1,8 @@
-- Test are taken from: https://github.com/apache/kafka/blob/139f7709bd3f5926901a21e55043388728ccca78/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java#L93
-- and the reference is generated with: https://pastila.nl/?06465d36/87f8ab2c9f6501c54f1c0879a13c8626
SELECT kafkaMurmurHash('21');
SELECT kafkaMurmurHash('foobar');
SELECT kafkaMurmurHash('a-little-bit-long-string');
SELECT kafkaMurmurHash('a-little-bit-longer-string');
SELECT kafkaMurmurHash('lkjh234lh9fiuh90y23oiuhsafujhadof229phr9h19h89h8');

Some files were not shown because too many files have changed in this diff Show More