Merge remote-tracking branch 'origin/master' into opt_if_map

This commit is contained in:
taiyang-li 2024-02-05 10:25:00 +08:00
commit 4fc9eeddc9
79 changed files with 2444 additions and 705 deletions

View File

@ -35,7 +35,7 @@ jobs:
- name: PrepareRunConfig
id: runconfig
run: |
python3 "$GITHUB_WORKSPACE/tests/ci/ci.py" --configure --rebuild-all-binaries --outfile ${{ runner.temp }}/ci_run_data.json
python3 "$GITHUB_WORKSPACE/tests/ci/ci.py" --configure --outfile ${{ runner.temp }}/ci_run_data.json
echo "::group::CI configuration"
python3 -m json.tool ${{ runner.temp }}/ci_run_data.json
@ -319,22 +319,15 @@ jobs:
run_command: |
python3 build_report_check.py "$CHECK_NAME"
MarkReleaseReady:
needs: [RunConfig, BuilderBinDarwin, BuilderBinDarwinAarch64, BuilderDebRelease, BuilderDebAarch64]
if: ${{ !failure() && !cancelled() }}
needs:
- BuilderBinDarwin
- BuilderBinDarwinAarch64
- BuilderDebRelease
- BuilderDebAarch64
runs-on: [self-hosted, style-checker]
steps:
- name: Check out repository code
uses: ClickHouse/checkout@v1
with:
clear-repository: true
- name: Mark Commit Release Ready
run: |
cd "$GITHUB_WORKSPACE/tests/ci"
python3 mark_release_ready.py
uses: ./.github/workflows/reusable_test.yml
with:
test_name: Mark Commit Release Ready
runner_type: style-checker
data: ${{ needs.RunConfig.outputs.data }}
run_command: |
python3 mark_release_ready.py
############################################################################################
#################################### INSTALL PACKAGES ######################################
############################################################################################

View File

@ -491,11 +491,11 @@ jobs:
run_command: |
TEMP_PATH="${TEMP_PATH}/integration" \
python3 integration_test_check.py "Integration $CHECK_NAME" \
--validate-bugfix --post-commit-status=file || echo 'ignore exit code'
--validate-bugfix --post-commit-status=file || echo "ignore exit code"
TEMP_PATH="${TEMP_PATH}/stateless" \
python3 functional_test_check.py "Stateless $CHECK_NAME" "$KILL_TIMEOUT" \
--validate-bugfix --post-commit-status=file || echo 'ignore exit code'
--validate-bugfix --post-commit-status=file || echo "ignore exit code"
python3 bugfix_validate_check.py "${TEMP_PATH}/stateless/functional_commit_status.tsv" "${TEMP_PATH}/integration/integration_commit_status.tsv"
##############################################################################################

View File

@ -41,7 +41,7 @@ jobs:
id: runconfig
run: |
echo "::group::configure CI run"
python3 "$GITHUB_WORKSPACE/tests/ci/ci.py" --configure --rebuild-all-binaries --outfile ${{ runner.temp }}/ci_run_data.json
python3 "$GITHUB_WORKSPACE/tests/ci/ci.py" --configure --outfile ${{ runner.temp }}/ci_run_data.json
echo "::endgroup::"
echo "::group::CI run configure results"
python3 -m json.tool ${{ runner.temp }}/ci_run_data.json

View File

@ -55,7 +55,7 @@ jobs:
python3 ./utils/security-generator/generate_security.py > SECURITY.md
git diff HEAD
- name: Create Pull Request
uses: peter-evans/create-pull-request@v3
uses: peter-evans/create-pull-request@v6
with:
author: "robot-clickhouse <robot-clickhouse@users.noreply.github.com>"
token: ${{ secrets.ROBOT_CLICKHOUSE_COMMIT_TOKEN }}

View File

@ -166,12 +166,6 @@ set (SRCS
)
add_library (_poco_foundation ${SRCS})
target_link_libraries (_poco_foundation
PUBLIC
boost::headers_only
boost::system
)
add_library (Poco::Foundation ALIAS _poco_foundation)
# TODO: remove these warning exclusions

View File

@ -22,9 +22,6 @@
#include <cstddef>
#include <map>
#include <vector>
#include <boost/smart_ptr/intrusive_ptr.hpp>
#include "Poco/Channel.h"
#include "Poco/Format.h"
#include "Poco/Foundation.h"
@ -37,7 +34,7 @@ namespace Poco
class Exception;
class Logger;
using LoggerPtr = boost::intrusive_ptr<Logger>;
using LoggerPtr = std::shared_ptr<Logger>;
class Foundation_API Logger : public Channel
/// Logger is a special Channel that acts as the main
@ -874,11 +871,21 @@ public:
/// If the Logger does not yet exist, it is created, based
/// on its parent logger.
static LoggerPtr getShared(const std::string & name, bool should_be_owned_by_shared_ptr_if_created = true);
static LoggerPtr getShared(const std::string & name);
/// Returns a shared pointer to the Logger with the given name.
/// If the Logger does not yet exist, it is created, based
/// on its parent logger.
static Logger & unsafeGet(const std::string & name);
/// Returns a reference to the Logger with the given name.
/// If the Logger does not yet exist, it is created, based
/// on its parent logger.
///
/// WARNING: This method is not thread safe. You should
/// probably use get() instead.
/// The only time this method should be used is during
/// program initialization, when only one thread is running.
static Logger & create(const std::string & name, Channel * pChannel, int level = Message::PRIO_INFORMATION);
/// Creates and returns a reference to a Logger with the
/// given name. The Logger's Channel and log level as set as
@ -925,16 +932,6 @@ public:
static const std::string ROOT; /// The name of the root logger ("").
public:
struct LoggerEntry
{
Poco::Logger * logger;
bool owned_by_shared_ptr = false;
};
using LoggerMap = std::unordered_map<std::string, LoggerEntry>;
using LoggerMapIterator = LoggerMap::iterator;
protected:
Logger(const std::string & name, Channel * pChannel, int level);
~Logger();
@ -943,19 +940,12 @@ protected:
void log(const std::string & text, Message::Priority prio, const char * file, int line);
static std::string format(const std::string & fmt, int argc, std::string argv[]);
static Logger & unsafeCreate(const std::string & name, Channel * pChannel, int level = Message::PRIO_INFORMATION);
static Logger & parent(const std::string & name);
static void add(Logger * pLogger);
static Logger * find(const std::string & name);
private:
static std::pair<Logger::LoggerMapIterator, bool> unsafeGet(const std::string & name, bool get_shared);
static Logger * unsafeGetRawPtr(const std::string & name);
static std::pair<LoggerMapIterator, bool> unsafeCreate(const std::string & name, Channel * pChannel, int level = Message::PRIO_INFORMATION);
static Logger & parent(const std::string & name);
static std::pair<LoggerMapIterator, bool> add(Logger * pLogger);
static std::optional<LoggerMapIterator> find(const std::string & name);
static Logger * findRawPtr(const std::string & name);
friend void intrusive_ptr_add_ref(Logger * ptr);
friend void intrusive_ptr_release(Logger * ptr);
Logger();
Logger(const Logger &);
Logger & operator=(const Logger &);

View File

@ -53,10 +53,11 @@ protected:
virtual ~RefCountedObject();
/// Destroys the RefCountedObject.
mutable std::atomic<size_t> _counter;
private:
RefCountedObject(const RefCountedObject &);
RefCountedObject & operator=(const RefCountedObject &);
mutable std::atomic<size_t> _counter;
};

View File

@ -38,7 +38,14 @@ std::mutex & getLoggerMutex()
return *logger_mutex;
}
Poco::Logger::LoggerMap * _pLoggerMap = nullptr;
struct LoggerEntry
{
Poco::Logger * logger;
bool owned_by_shared_ptr = false;
};
using LoggerMap = std::unordered_map<std::string, LoggerEntry>;
LoggerMap * _pLoggerMap = nullptr;
}
@ -302,9 +309,38 @@ void Logger::formatDump(std::string& message, const void* buffer, std::size_t le
namespace
{
struct LoggerDeleter
{
void operator()(Poco::Logger * logger)
{
std::lock_guard<std::mutex> lock(getLoggerMutex());
/// If logger infrastructure is destroyed just decrement logger reference count
if (!_pLoggerMap)
{
logger->release();
return;
}
auto it = _pLoggerMap->find(logger->name());
assert(it != _pLoggerMap->end());
/** If reference count is 1, this means this shared pointer owns logger
* and need destroy it.
*/
size_t reference_count_before_release = logger->release();
if (reference_count_before_release == 1)
{
assert(it->second.owned_by_shared_ptr);
_pLoggerMap->erase(it);
}
}
};
inline LoggerPtr makeLoggerPtr(Logger & logger)
{
return LoggerPtr(&logger, false /*add_ref*/);
return std::shared_ptr<Logger>(&logger, LoggerDeleter());
}
}
@ -314,87 +350,64 @@ Logger& Logger::get(const std::string& name)
{
std::lock_guard<std::mutex> lock(getLoggerMutex());
auto [it, inserted] = unsafeGet(name, false /*get_shared*/);
return *it->second.logger;
Logger & logger = unsafeGet(name);
/** If there are already shared pointer created for this logger
* we need to increment Logger reference count and now logger
* is owned by logger infrastructure.
*/
auto it = _pLoggerMap->find(name);
if (it->second.owned_by_shared_ptr)
{
it->second.logger->duplicate();
it->second.owned_by_shared_ptr = false;
}
return logger;
}
LoggerPtr Logger::getShared(const std::string & name, bool should_be_owned_by_shared_ptr_if_created)
LoggerPtr Logger::getShared(const std::string & name)
{
std::lock_guard<std::mutex> lock(getLoggerMutex());
auto [it, inserted] = unsafeGet(name, true /*get_shared*/);
bool logger_exists = _pLoggerMap && _pLoggerMap->contains(name);
/** If during `unsafeGet` logger was created, then this shared pointer owns it.
* If logger was already created, then this shared pointer does not own it.
Logger & logger = unsafeGet(name);
/** If logger already exists, then this shared pointer does not own it.
* If logger does not exists, logger infrastructure could be already destroyed
* or logger was created.
*/
if (inserted)
if (logger_exists)
{
if (should_be_owned_by_shared_ptr_if_created)
it->second.owned_by_shared_ptr = true;
else
it->second.logger->duplicate();
logger.duplicate();
}
else if (_pLoggerMap)
{
_pLoggerMap->find(name)->second.owned_by_shared_ptr = true;
}
return makeLoggerPtr(*it->second.logger);
return makeLoggerPtr(logger);
}
std::pair<Logger::LoggerMapIterator, bool> Logger::unsafeGet(const std::string& name, bool get_shared)
Logger& Logger::unsafeGet(const std::string& name)
{
std::optional<Logger::LoggerMapIterator> optional_logger_it = find(name);
bool should_recreate_logger = false;
if (optional_logger_it)
Logger* pLogger = find(name);
if (!pLogger)
{
auto & logger_it = *optional_logger_it;
std::optional<size_t> reference_count_before;
if (get_shared)
{
reference_count_before = logger_it->second.logger->duplicate();
}
else if (logger_it->second.owned_by_shared_ptr)
{
reference_count_before = logger_it->second.logger->duplicate();
logger_it->second.owned_by_shared_ptr = false;
}
/// Other thread already decided to delete this logger, but did not yet remove it from map
if (reference_count_before && reference_count_before == 0)
should_recreate_logger = true;
}
if (!optional_logger_it || should_recreate_logger)
{
Logger * logger = nullptr;
if (name == ROOT)
{
logger = new Logger(name, nullptr, Message::PRIO_INFORMATION);
pLogger = new Logger(name, 0, Message::PRIO_INFORMATION);
}
else
{
Logger& par = parent(name);
logger = new Logger(name, par.getChannel(), par.getLevel());
pLogger = new Logger(name, par.getChannel(), par.getLevel());
}
if (should_recreate_logger)
{
(*optional_logger_it)->second.logger = logger;
return std::make_pair(*optional_logger_it, true);
}
return add(logger);
add(pLogger);
}
return std::make_pair(*optional_logger_it, false);
}
Logger * Logger::unsafeGetRawPtr(const std::string & name)
{
return unsafeGet(name, false /*get_shared*/).first->second.logger;
return *pLogger;
}
@ -402,24 +415,24 @@ Logger& Logger::create(const std::string& name, Channel* pChannel, int level)
{
std::lock_guard<std::mutex> lock(getLoggerMutex());
return *unsafeCreate(name, pChannel, level).first->second.logger;
return unsafeCreate(name, pChannel, level);
}
LoggerPtr Logger::createShared(const std::string & name, Channel * pChannel, int level)
{
std::lock_guard<std::mutex> lock(getLoggerMutex());
auto [it, inserted] = unsafeCreate(name, pChannel, level);
it->second.owned_by_shared_ptr = true;
Logger & logger = unsafeCreate(name, pChannel, level);
_pLoggerMap->find(name)->second.owned_by_shared_ptr = true;
return makeLoggerPtr(*it->second.logger);
return makeLoggerPtr(logger);
}
Logger& Logger::root()
{
std::lock_guard<std::mutex> lock(getLoggerMutex());
return *unsafeGetRawPtr(ROOT);
return unsafeGet(ROOT);
}
@ -427,11 +440,7 @@ Logger* Logger::has(const std::string& name)
{
std::lock_guard<std::mutex> lock(getLoggerMutex());
auto optional_it = find(name);
if (!optional_it)
return nullptr;
return (*optional_it)->second.logger;
return find(name);
}
@ -450,69 +459,20 @@ void Logger::shutdown()
}
delete _pLoggerMap;
_pLoggerMap = nullptr;
_pLoggerMap = 0;
}
}
std::optional<Logger::LoggerMapIterator> Logger::find(const std::string& name)
Logger* Logger::find(const std::string& name)
{
if (_pLoggerMap)
{
LoggerMap::iterator it = _pLoggerMap->find(name);
if (it != _pLoggerMap->end())
return it;
return {};
return it->second.logger;
}
return {};
}
Logger * Logger::findRawPtr(const std::string & name)
{
auto optional_it = find(name);
if (!optional_it)
return nullptr;
return (*optional_it)->second.logger;
}
void intrusive_ptr_add_ref(Logger * ptr)
{
ptr->duplicate();
}
void intrusive_ptr_release(Logger * ptr)
{
size_t reference_count_before = ptr->_counter.fetch_sub(1, std::memory_order_acq_rel);
if (reference_count_before != 1)
return;
{
std::lock_guard<std::mutex> lock(getLoggerMutex());
if (_pLoggerMap)
{
auto it = _pLoggerMap->find(ptr->name());
/** It is possible that during release other thread created logger and
* updated iterator in map.
*/
if (it != _pLoggerMap->end() && ptr == it->second.logger)
{
/** If reference count is 0, this means this intrusive pointer owns logger
* and need destroy it.
*/
assert(it->second.owned_by_shared_ptr);
_pLoggerMap->erase(it);
}
}
}
delete ptr;
return 0;
}
@ -530,14 +490,14 @@ void Logger::names(std::vector<std::string>& names)
}
}
std::pair<Logger::LoggerMapIterator, bool> Logger::unsafeCreate(const std::string & name, Channel * pChannel, int level)
Logger& Logger::unsafeCreate(const std::string & name, Channel * pChannel, int level)
{
if (find(name)) throw ExistsException();
Logger* pLogger = new Logger(name, pChannel, level);
return add(pLogger);
}
add(pLogger);
return *pLogger;
}
Logger& Logger::parent(const std::string& name)
{
@ -545,13 +505,13 @@ Logger& Logger::parent(const std::string& name)
if (pos != std::string::npos)
{
std::string pname = name.substr(0, pos);
Logger* pParent = findRawPtr(pname);
Logger* pParent = find(pname);
if (pParent)
return *pParent;
else
return parent(pname);
}
else return *unsafeGetRawPtr(ROOT);
else return unsafeGet(ROOT);
}
@ -619,14 +579,12 @@ namespace
}
std::pair<Logger::LoggerMapIterator, bool> Logger::add(Logger* pLogger)
void Logger::add(Logger* pLogger)
{
if (!_pLoggerMap)
_pLoggerMap = new Logger::LoggerMap;
_pLoggerMap = new LoggerMap;
auto result = _pLoggerMap->emplace(pLogger->name(), LoggerEntry{pLogger, false /*owned_by_shared_ptr*/});
assert(result.second);
return result;
_pLoggerMap->emplace(pLogger->name(), LoggerEntry{pLogger, false /*owned_by_shared_ptr*/});
}

View File

@ -34,7 +34,7 @@ RUN arch=${TARGETARCH:-amd64} \
# lts / testing / prestable / etc
ARG REPO_CHANNEL="stable"
ARG REPOSITORY="https://packages.clickhouse.com/tgz/${REPO_CHANNEL}"
ARG VERSION="24.1.1.2048"
ARG VERSION="24.1.2.5"
ARG PACKAGES="clickhouse-keeper"
ARG DIRECT_DOWNLOAD_URLS=""

View File

@ -32,7 +32,7 @@ RUN arch=${TARGETARCH:-amd64} \
# lts / testing / prestable / etc
ARG REPO_CHANNEL="stable"
ARG REPOSITORY="https://packages.clickhouse.com/tgz/${REPO_CHANNEL}"
ARG VERSION="24.1.1.2048"
ARG VERSION="24.1.2.5"
ARG PACKAGES="clickhouse-client clickhouse-server clickhouse-common-static"
ARG DIRECT_DOWNLOAD_URLS=""

View File

@ -30,7 +30,7 @@ RUN sed -i "s|http://archive.ubuntu.com|${apt_archive}|g" /etc/apt/sources.list
ARG REPO_CHANNEL="stable"
ARG REPOSITORY="deb [signed-by=/usr/share/keyrings/clickhouse-keyring.gpg] https://packages.clickhouse.com/deb ${REPO_CHANNEL} main"
ARG VERSION="24.1.1.2048"
ARG VERSION="24.1.2.5"
ARG PACKAGES="clickhouse-client clickhouse-server clickhouse-common-static"
# set non-empty deb_location_url url to create a docker image

View File

@ -0,0 +1,31 @@
---
sidebar_position: 1
sidebar_label: 2024
---
# 2024 Changelog
### ClickHouse release v23.11.5.29-stable (d83b108deca) FIXME as compared to v23.11.4.24-stable (e79d840d7fe)
#### Improvement
* Backported in [#58815](https://github.com/ClickHouse/ClickHouse/issues/58815): Add `SYSTEM JEMALLOC PURGE` for purging unused jemalloc pages, `SYSTEM JEMALLOC [ ENABLE | DISABLE | FLUSH ] PROFILE` for controlling jemalloc profile if the profiler is enabled. Add jemalloc-related 4LW command in Keeper: `jmst` for dumping jemalloc stats, `jmfp`, `jmep`, `jmdp` for controlling jemalloc profile if the profiler is enabled. [#58665](https://github.com/ClickHouse/ClickHouse/pull/58665) ([Antonio Andelic](https://github.com/antonio2368)).
* Backported in [#59234](https://github.com/ClickHouse/ClickHouse/issues/59234): Allow to ignore schema evolution in Iceberg table engine and read all data using schema specified by the user on table creation or latest schema parsed from metadata on table creation. This is done under a setting `iceberg_engine_ignore_schema_evolution` that is disabled by default. Note that enabling this setting can lead to incorrect result as in case of evolved schema all data files will be read using the same schema. [#59133](https://github.com/ClickHouse/ClickHouse/pull/59133) ([Kruglov Pavel](https://github.com/Avogar)).
#### Bug Fix (user-visible misbehavior in an official stable release)
* Fix a stupid case of intersecting parts [#58482](https://github.com/ClickHouse/ClickHouse/pull/58482) ([Alexander Tokmakov](https://github.com/tavplubix)).
* Fix stream partitioning in parallel window functions [#58739](https://github.com/ClickHouse/ClickHouse/pull/58739) ([Dmitry Novik](https://github.com/novikd)).
* Fix double destroy call on exception throw in addBatchLookupTable8 [#58745](https://github.com/ClickHouse/ClickHouse/pull/58745) ([Raúl Marín](https://github.com/Algunenano)).
* Fix JSONExtract function for LowCardinality(Nullable) columns [#58808](https://github.com/ClickHouse/ClickHouse/pull/58808) ([vdimir](https://github.com/vdimir)).
* Fix: LIMIT BY and LIMIT in distributed query [#59153](https://github.com/ClickHouse/ClickHouse/pull/59153) ([Igor Nikonov](https://github.com/devcrafter)).
* Fix not-ready set for system.tables [#59351](https://github.com/ClickHouse/ClickHouse/pull/59351) ([Nikolai Kochetov](https://github.com/KochetovNicolai)).
* Fix translate() with FixedString input [#59356](https://github.com/ClickHouse/ClickHouse/pull/59356) ([Raúl Marín](https://github.com/Algunenano)).
#### NOT FOR CHANGELOG / INSIGNIFICANT
* refine error message [#57991](https://github.com/ClickHouse/ClickHouse/pull/57991) ([Han Fei](https://github.com/hanfei1991)).
* Fix rare race in external sort/aggregation with temporary data in cache [#58013](https://github.com/ClickHouse/ClickHouse/pull/58013) ([Anton Popov](https://github.com/CurtizJ)).
* Follow-up to [#58482](https://github.com/ClickHouse/ClickHouse/issues/58482) [#58574](https://github.com/ClickHouse/ClickHouse/pull/58574) ([Alexander Tokmakov](https://github.com/tavplubix)).
* Fix possible race in ManyAggregatedData dtor. [#58624](https://github.com/ClickHouse/ClickHouse/pull/58624) ([Nikolai Kochetov](https://github.com/KochetovNicolai)).
* Decrease log level for one log message [#59168](https://github.com/ClickHouse/ClickHouse/pull/59168) ([Kseniia Sumarokova](https://github.com/kssenii)).

View File

@ -0,0 +1,36 @@
---
sidebar_position: 1
sidebar_label: 2024
---
# 2024 Changelog
### ClickHouse release v23.12.3.40-stable (a594704ae75) FIXME as compared to v23.12.2.59-stable (17ab210e761)
#### Improvement
* Backported in [#58660](https://github.com/ClickHouse/ClickHouse/issues/58660): When executing some queries, which require a lot of streams for reading data, the error `"Paste JOIN requires sorted tables only"` was previously thrown. Now the numbers of streams resize to 1 in that case. [#58608](https://github.com/ClickHouse/ClickHouse/pull/58608) ([Yarik Briukhovetskyi](https://github.com/yariks5s)).
* Backported in [#58817](https://github.com/ClickHouse/ClickHouse/issues/58817): Add `SYSTEM JEMALLOC PURGE` for purging unused jemalloc pages, `SYSTEM JEMALLOC [ ENABLE | DISABLE | FLUSH ] PROFILE` for controlling jemalloc profile if the profiler is enabled. Add jemalloc-related 4LW command in Keeper: `jmst` for dumping jemalloc stats, `jmfp`, `jmep`, `jmdp` for controlling jemalloc profile if the profiler is enabled. [#58665](https://github.com/ClickHouse/ClickHouse/pull/58665) ([Antonio Andelic](https://github.com/antonio2368)).
* Backported in [#59235](https://github.com/ClickHouse/ClickHouse/issues/59235): Allow to ignore schema evolution in Iceberg table engine and read all data using schema specified by the user on table creation or latest schema parsed from metadata on table creation. This is done under a setting `iceberg_engine_ignore_schema_evolution` that is disabled by default. Note that enabling this setting can lead to incorrect result as in case of evolved schema all data files will be read using the same schema. [#59133](https://github.com/ClickHouse/ClickHouse/pull/59133) ([Kruglov Pavel](https://github.com/Avogar)).
#### Bug Fix (user-visible misbehavior in an official stable release)
* Delay reading from StorageKafka to allow multiple reads in materialized views [#58477](https://github.com/ClickHouse/ClickHouse/pull/58477) ([János Benjamin Antal](https://github.com/antaljanosbenjamin)).
* Fix a stupid case of intersecting parts [#58482](https://github.com/ClickHouse/ClickHouse/pull/58482) ([Alexander Tokmakov](https://github.com/tavplubix)).
* Disable max_joined_block_rows in ConcurrentHashJoin [#58595](https://github.com/ClickHouse/ClickHouse/pull/58595) ([vdimir](https://github.com/vdimir)).
* Fix stream partitioning in parallel window functions [#58739](https://github.com/ClickHouse/ClickHouse/pull/58739) ([Dmitry Novik](https://github.com/novikd)).
* Fix double destroy call on exception throw in addBatchLookupTable8 [#58745](https://github.com/ClickHouse/ClickHouse/pull/58745) ([Raúl Marín](https://github.com/Algunenano)).
* Fix JSONExtract function for LowCardinality(Nullable) columns [#58808](https://github.com/ClickHouse/ClickHouse/pull/58808) ([vdimir](https://github.com/vdimir)).
* Multiple read file log storage in mv [#58877](https://github.com/ClickHouse/ClickHouse/pull/58877) ([János Benjamin Antal](https://github.com/antaljanosbenjamin)).
* Fix: LIMIT BY and LIMIT in distributed query [#59153](https://github.com/ClickHouse/ClickHouse/pull/59153) ([Igor Nikonov](https://github.com/devcrafter)).
* Fix not-ready set for system.tables [#59351](https://github.com/ClickHouse/ClickHouse/pull/59351) ([Nikolai Kochetov](https://github.com/KochetovNicolai)).
* Fix translate() with FixedString input [#59356](https://github.com/ClickHouse/ClickHouse/pull/59356) ([Raúl Marín](https://github.com/Algunenano)).
#### NOT FOR CHANGELOG / INSIGNIFICANT
* Follow-up to [#58482](https://github.com/ClickHouse/ClickHouse/issues/58482) [#58574](https://github.com/ClickHouse/ClickHouse/pull/58574) ([Alexander Tokmakov](https://github.com/tavplubix)).
* Fix possible race in ManyAggregatedData dtor. [#58624](https://github.com/ClickHouse/ClickHouse/pull/58624) ([Nikolai Kochetov](https://github.com/KochetovNicolai)).
* Change log level for super imporant message in Keeper [#59010](https://github.com/ClickHouse/ClickHouse/pull/59010) ([alesapin](https://github.com/alesapin)).
* Decrease log level for one log message [#59168](https://github.com/ClickHouse/ClickHouse/pull/59168) ([Kseniia Sumarokova](https://github.com/kssenii)).
* Fix fasttest by pinning pip dependencies [#59256](https://github.com/ClickHouse/ClickHouse/pull/59256) ([Azat Khuzhin](https://github.com/azat)).
* No debug symbols in Rust [#59306](https://github.com/ClickHouse/ClickHouse/pull/59306) ([Alexey Milovidov](https://github.com/alexey-milovidov)).

View File

@ -0,0 +1,14 @@
---
sidebar_position: 1
sidebar_label: 2024
---
# 2024 Changelog
### ClickHouse release v24.1.2.5-stable (b2605dd4a5a) FIXME as compared to v24.1.1.2048-stable (5a024dfc093)
#### Bug Fix (user-visible misbehavior in an official stable release)
* Fix translate() with FixedString input [#59356](https://github.com/ClickHouse/ClickHouse/pull/59356) ([Raúl Marín](https://github.com/Algunenano)).
* Fix stacktraces for binaries without debug symbols [#59444](https://github.com/ClickHouse/ClickHouse/pull/59444) ([Azat Khuzhin](https://github.com/azat)).

View File

@ -1922,7 +1922,7 @@ Possible values:
- Positive integer.
- 0 — Asynchronous insertions are disabled.
Default value: `100000`.
Default value: `1000000`.
### async_insert_max_query_number {#async-insert-max-query-number}
@ -1935,7 +1935,7 @@ Possible values:
Default value: `450`.
### async_insert_busy_timeout_ms {#async-insert-busy-timeout-ms}
### async_insert_busy_timeout_max_ms {#async-insert-busy-timeout-max-ms}
The maximum timeout in milliseconds since the first `INSERT` query before inserting collected data.
@ -1946,6 +1946,61 @@ Possible values:
Default value: `200`.
### async_insert_poll_timeout_ms {#async-insert-poll-timeout-ms}
Timeout in milliseconds for polling data from asynchronous insert queue.
Possible values:
- Positive integer.
Default value: `10`.
### async_insert_use_adaptive_busy_timeout {#allow-experimental-async-insert-adaptive-busy-timeout}
Use adaptive asynchronous insert timeout.
Possible values:
- 0 - Disabled.
- 1 - Enabled.
Default value: `0`.
### async_insert_busy_timeout_min_ms {#async-insert-busy-timeout-min-ms}
If adaptive asynchronous insert timeout is allowed through [async_insert_use_adaptive_busy_timeout](#allow-experimental-async-insert-adaptive-busy-timeout), the setting specifies the minimum value of the asynchronous insert timeout in milliseconds. It also serves as the initial value, which may be increased later by the adaptive algorithm, up to the [async_insert_busy_timeout_ms](#async_insert_busy_timeout_ms).
Possible values:
- Positive integer.
Default value: `50`.
### async_insert_busy_timeout_ms {#async-insert-busy-timeout-ms}
Alias for [`async_insert_busy_timeout_max_ms`](#async_insert_busy_timeout_max_ms).
### async_insert_busy_timeout_increase_rate {#async-insert-busy-timeout-increase-rate}
If adaptive asynchronous insert timeout is allowed through [async_insert_use_adaptive_busy_timeout](#allow-experimental-async-insert-adaptive-busy-timeout), the setting specifies the exponential growth rate at which the adaptive asynchronous insert timeout increases.
Possible values:
- A positive floating-point number.
Default value: `0.2`.
### async_insert_busy_timeout_decrease_rate {#async-insert-busy-timeout-decrease-rate}
If adaptive asynchronous insert timeout is allowed through [async_insert_use_adaptive_busy_timeout](#allow-experimental-async-insert-adaptive-busy-timeout), the setting specifies the exponential growth rate at which the adaptive asynchronous insert timeout decreases.
Possible values:
- A positive floating-point number.
Default value: `0.2`.
### async_insert_stale_timeout_ms {#async-insert-stale-timeout-ms}
The maximum timeout in milliseconds since the last `INSERT` query before dumping collected data. If enabled, the settings prolongs the [async_insert_busy_timeout_ms](#async-insert-busy-timeout-ms) with every `INSERT` query as long as [async_insert_max_data_size](#async-insert-max-data-size) is not exceeded.
@ -5321,4 +5376,4 @@ Allow to ignore schema evolution in Iceberg table engine and read all data using
Enabling this setting can lead to incorrect result as in case of evolved schema all data files will be read using the same schema.
:::
Default value: 'false'.
Default value: 'false'.

View File

@ -1450,7 +1450,13 @@ window.onpopstate = function(event) {
if (window.location.hash) {
try {
let search_query_, customized_;
({host, user, queries, params, search_query_, customized_} = JSON.parse(LZString.decompressFromEncodedURIComponent(window.location.hash.substring(1))));
try {
({host, user, queries, params, search_query_, customized_} = JSON.parse(LZString.decompressFromEncodedURIComponent(window.location.hash.substring(1))));
} catch {
// For compatibility with uncompressed state
({host, user, queries, params, search_query_, customized_} = JSON.parse(atob(window.location.hash.substring(1))));
}
// For compatibility with old URLs' hashes
search_query = search_query_ !== undefined ? search_query_ : search_query;
customized = customized_ !== undefined ? customized_ : true;

View File

@ -1214,7 +1214,7 @@ private:
static void expandGroupByAll(QueryNode & query_tree_node_typed);
static void expandOrderByAll(QueryNode & query_tree_node_typed, const Settings & settings);
void expandOrderByAll(QueryNode & query_tree_node_typed, const Settings & settings);
static std::string
rewriteAggregateFunctionNameIfNeeded(const std::string & aggregate_function_name, NullsAction action, const ContextPtr & context);
@ -2349,15 +2349,18 @@ void QueryAnalyzer::expandOrderByAll(QueryNode & query_tree_node_typed, const Se
for (auto & node : projection_nodes)
{
if (auto * identifier_node = node->as<IdentifierNode>(); identifier_node != nullptr)
if (Poco::toUpper(identifier_node->getIdentifier().getFullName()) == "ALL" || Poco::toUpper(identifier_node->getAlias()) == "ALL")
throw Exception(ErrorCodes::UNEXPECTED_EXPRESSION,
"Cannot use ORDER BY ALL to sort a column with name 'all', please disable setting `enable_order_by_all` and try again");
if (auto * function_node = node->as<FunctionNode>(); function_node != nullptr)
if (Poco::toUpper(function_node->getAlias()) == "ALL")
auto resolved_expression_it = resolved_expressions.find(node);
if (resolved_expression_it != resolved_expressions.end())
{
auto projection_names = resolved_expression_it->second;
if (projection_names.size() != 1)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Expression nodes list expected 1 projection names. Actual {}",
projection_names.size());
if (Poco::toUpper(projection_names[0]) == "ALL")
throw Exception(ErrorCodes::UNEXPECTED_EXPRESSION,
"Cannot use ORDER BY ALL to sort a column with name 'all', please disable setting `enable_order_by_all` and try again");
}
auto sort_node = std::make_shared<SortNode>(node, all_node->getSortDirection(), all_node->getNullsSortDirection());
list_node->getNodes().push_back(sort_node);
@ -7180,8 +7183,6 @@ void QueryAnalyzer::resolveQuery(const QueryTreeNodePtr & query_node, Identifier
if (query_node_typed.hasHaving() && query_node_typed.isGroupByWithTotals() && is_rollup_or_cube)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "WITH TOTALS and WITH ROLLUP or CUBE are not supported together in presence of HAVING");
expandOrderByAll(query_node_typed, settings);
/// Initialize aliases in query node scope
QueryExpressionsAliasVisitor visitor(scope);
@ -7368,6 +7369,7 @@ void QueryAnalyzer::resolveQuery(const QueryTreeNodePtr & query_node, Identifier
if (settings.enable_positional_arguments)
replaceNodesWithPositionalArguments(query_node_typed.getOrderByNode(), query_node_typed.getProjection().getNodes(), scope);
expandOrderByAll(query_node_typed, settings);
resolveSortNodeList(query_node_typed.getOrderByNode(), scope);
}

View File

@ -6,7 +6,7 @@
#include <Parsers/ASTBackupQuery.h>
#include <Storages/IStorage_fwd.h>
#include <Storages/TableLockHolder.h>
#include <Storages/MergeTree/ZooKeeperRetries.h>
#include <Common/ZooKeeper/ZooKeeperRetries.h>
#include <filesystem>
#include <queue>

View File

@ -1,6 +1,6 @@
#pragma once
#include <Storages/MergeTree/ZooKeeperRetries.h>
#include <Common/ZooKeeper/ZooKeeperRetries.h>
#include <Common/ZooKeeper/Common.h>
#include <Common/ZooKeeper/ZooKeeperWithFaultInjection.h>

View File

@ -22,12 +22,12 @@ namespace ErrorCodes
}
ConnectionEstablisher::ConnectionEstablisher(
IConnectionPool * pool_,
ConnectionPoolPtr pool_,
const ConnectionTimeouts * timeouts_,
const Settings & settings_,
LoggerPtr log_,
const QualifiedTableName * table_to_check_)
: pool(pool_), timeouts(timeouts_), settings(settings_), log(log_), table_to_check(table_to_check_), is_finished(false)
: pool(std::move(pool_)), timeouts(timeouts_), settings(settings_), log(log_), table_to_check(table_to_check_), is_finished(false)
{
}
@ -79,14 +79,13 @@ void ConnectionEstablisher::run(ConnectionEstablisher::TryResult & result, std::
return;
}
UInt32 delay = table_status_it->second.absolute_delay;
const UInt32 delay = table_status_it->second.absolute_delay;
if (delay < max_allowed_delay)
result.is_up_to_date = true;
else
{
result.is_up_to_date = false;
result.staleness = delay;
result.delay = delay;
LOG_TRACE(log, "Server {} has unacceptable replica delay for table {}.{}: {}", result.entry->getDescription(), table_to_check->database, table_to_check->table, delay);
ProfileEvents::increment(ProfileEvents::DistributedConnectionStaleReplica);
@ -111,12 +110,13 @@ void ConnectionEstablisher::run(ConnectionEstablisher::TryResult & result, std::
#if defined(OS_LINUX)
ConnectionEstablisherAsync::ConnectionEstablisherAsync(
IConnectionPool * pool_,
ConnectionPoolPtr pool_,
const ConnectionTimeouts * timeouts_,
const Settings & settings_,
LoggerPtr log_,
const QualifiedTableName * table_to_check_)
: AsyncTaskExecutor(std::make_unique<Task>(*this)), connection_establisher(pool_, timeouts_, settings_, log_, table_to_check_)
: AsyncTaskExecutor(std::make_unique<Task>(*this))
, connection_establisher(std::move(pool_), timeouts_, settings_, log_, table_to_check_)
{
epoll.add(timeout_descriptor.getDescriptor());
}

View File

@ -1,7 +1,5 @@
#pragma once
#include <variant>
#include <Common/AsyncTaskExecutor.h>
#include <Common/Epoll.h>
#include <Common/Fiber.h>
@ -20,7 +18,7 @@ class ConnectionEstablisher
public:
using TryResult = PoolWithFailoverBase<IConnectionPool>::TryResult;
ConnectionEstablisher(IConnectionPool * pool_,
ConnectionEstablisher(ConnectionPoolPtr pool_,
const ConnectionTimeouts * timeouts_,
const Settings & settings_,
LoggerPtr log,
@ -35,7 +33,7 @@ public:
bool isFinished() const { return is_finished; }
private:
IConnectionPool * pool;
ConnectionPoolPtr pool;
const ConnectionTimeouts * timeouts;
const Settings & settings;
LoggerPtr log;
@ -58,7 +56,7 @@ class ConnectionEstablisherAsync : public AsyncTaskExecutor
public:
using TryResult = PoolWithFailoverBase<IConnectionPool>::TryResult;
ConnectionEstablisherAsync(IConnectionPool * pool_,
ConnectionEstablisherAsync(ConnectionPoolPtr pool_,
const ConnectionTimeouts * timeouts_,
const Settings & settings_,
LoggerPtr log_,

View File

@ -63,7 +63,7 @@ IConnectionPool::Entry ConnectionPoolWithFailover::get(const ConnectionTimeouts
throw DB::Exception(DB::ErrorCodes::ALL_CONNECTION_TRIES_FAILED,
"Cannot get connection from ConnectionPoolWithFailover cause nested pools are empty");
TryGetEntryFunc try_get_entry = [&](NestedPool & pool, std::string & fail_message)
TryGetEntryFunc try_get_entry = [&](const NestedPoolPtr & pool, std::string & fail_message)
{
return tryGetEntry(pool, timeouts, fail_message, settings, {});
};
@ -126,7 +126,7 @@ std::vector<IConnectionPool::Entry> ConnectionPoolWithFailover::getMany(
std::optional<bool> skip_unavailable_endpoints,
GetPriorityForLoadBalancing::Func priority_func)
{
TryGetEntryFunc try_get_entry = [&](NestedPool & pool, std::string & fail_message)
TryGetEntryFunc try_get_entry = [&](const NestedPoolPtr & pool, std::string & fail_message)
{ return tryGetEntry(pool, timeouts, fail_message, settings, nullptr, async_callback); };
std::vector<TryResult> results = getManyImpl(settings, pool_mode, try_get_entry, skip_unavailable_endpoints, priority_func);
@ -143,7 +143,7 @@ std::vector<ConnectionPoolWithFailover::TryResult> ConnectionPoolWithFailover::g
const Settings & settings,
PoolMode pool_mode)
{
TryGetEntryFunc try_get_entry = [&](NestedPool & pool, std::string & fail_message)
TryGetEntryFunc try_get_entry = [&](const NestedPoolPtr & pool, std::string & fail_message)
{
return tryGetEntry(pool, timeouts, fail_message, settings);
};
@ -160,7 +160,7 @@ std::vector<ConnectionPoolWithFailover::TryResult> ConnectionPoolWithFailover::g
std::optional<bool> skip_unavailable_endpoints,
GetPriorityForLoadBalancing::Func priority_func)
{
TryGetEntryFunc try_get_entry = [&](NestedPool & pool, std::string & fail_message)
TryGetEntryFunc try_get_entry = [&](const NestedPoolPtr & pool, std::string & fail_message)
{ return tryGetEntry(pool, timeouts, fail_message, settings, &table_to_check, async_callback); };
return getManyImpl(settings, pool_mode, try_get_entry, skip_unavailable_endpoints, priority_func);
@ -216,7 +216,7 @@ std::vector<ConnectionPoolWithFailover::TryResult> ConnectionPoolWithFailover::g
ConnectionPoolWithFailover::TryResult
ConnectionPoolWithFailover::tryGetEntry(
IConnectionPool & pool,
const ConnectionPoolPtr & pool,
const ConnectionTimeouts & timeouts,
std::string & fail_message,
const Settings & settings,
@ -226,7 +226,7 @@ ConnectionPoolWithFailover::tryGetEntry(
#if defined(OS_LINUX)
if (async_callback)
{
ConnectionEstablisherAsync connection_establisher_async(&pool, &timeouts, settings, log, table_to_check);
ConnectionEstablisherAsync connection_establisher_async(pool, &timeouts, settings, log, table_to_check);
while (true)
{
connection_establisher_async.resume();
@ -246,7 +246,7 @@ ConnectionPoolWithFailover::tryGetEntry(
}
#endif
ConnectionEstablisher connection_establisher(&pool, &timeouts, settings, log, table_to_check);
ConnectionEstablisher connection_establisher(pool, &timeouts, settings, log, table_to_check);
TryResult result;
connection_establisher.run(result, fail_message);
return result;

View File

@ -115,7 +115,7 @@ private:
/// If table_to_check is not null and the check is enabled in settings, check that replication delay
/// for this table is not too large.
TryResult tryGetEntry(
IConnectionPool & pool,
const ConnectionPoolPtr & pool,
const ConnectionTimeouts & timeouts,
std::string & fail_message,
const Settings & settings,

View File

@ -41,8 +41,9 @@ HedgedConnectionsFactory::HedgedConnectionsFactory(
, skip_unavailable_shards(skip_unavailable_shards_)
{
shuffled_pools = pool->getShuffledPools(settings_, priority_func);
for (auto shuffled_pool : shuffled_pools)
replicas.emplace_back(std::make_unique<ConnectionEstablisherAsync>(shuffled_pool.pool, &timeouts, settings_, log, table_to_check.get()));
for (const auto & shuffled_pool : shuffled_pools)
replicas.emplace_back(
std::make_unique<ConnectionEstablisherAsync>(shuffled_pool.pool, &timeouts, settings_, log, table_to_check.get()));
}
HedgedConnectionsFactory::~HedgedConnectionsFactory()
@ -413,7 +414,7 @@ HedgedConnectionsFactory::State HedgedConnectionsFactory::setBestUsableReplica(C
indexes.end(),
[&](size_t lhs, size_t rhs)
{
return replicas[lhs].connection_establisher->getResult().staleness < replicas[rhs].connection_establisher->getResult().staleness;
return replicas[lhs].connection_establisher->getResult().delay < replicas[rhs].connection_establisher->getResult().delay;
});
replicas[indexes[0]].is_ready = true;

View File

@ -143,6 +143,8 @@
M(AsynchronousInsertThreads, "Number of threads in the AsynchronousInsert thread pool.") \
M(AsynchronousInsertThreadsActive, "Number of threads in the AsynchronousInsert thread pool running a task.") \
M(AsynchronousInsertThreadsScheduled, "Number of queued or active jobs in the AsynchronousInsert thread pool.") \
M(AsynchronousInsertQueueSize, "Number of pending tasks in the AsynchronousInsert queue.") \
M(AsynchronousInsertQueueBytes, "Number of pending bytes in the AsynchronousInsert queue.") \
M(StartupSystemTablesThreads, "Number of threads in the StartupSystemTables thread pool.") \
M(StartupSystemTablesThreadsActive, "Number of threads in the StartupSystemTables thread pool running a task.") \
M(StartupSystemTablesThreadsScheduled, "Number of queued or active jobs in the StartupSystemTables thread pool.") \

View File

@ -2,8 +2,6 @@
#if defined(OS_LINUX)
#include <sys/epoll.h>
#include <vector>
#include <functional>
#include <boost/noncopyable.hpp>
#include <Poco/Logger.h>

View File

@ -2,8 +2,6 @@
#include <memory>
#include <base/defines.h>
#include <Poco/Channel.h>
#include <Poco/Logger.h>
#include <Poco/Message.h>
@ -26,16 +24,6 @@ using LoggerRawPtr = Poco::Logger *;
*/
LoggerPtr getLogger(const std::string & name);
/** Get Logger with specified name. If the Logger does not exists, it is created.
* This overload was added for specific purpose, when logger is constructed from constexpr string.
* Logger is destroyed only during program shutdown.
*/
template <size_t n>
ALWAYS_INLINE LoggerPtr getLogger(const char (&name)[n])
{
return Poco::Logger::getShared(name, false /*should_be_owned_by_shared_ptr_if_created*/);
}
/** Create Logger with specified name, channel and logging level.
* If Logger already exists, throws exception.
* Logger is destroyed, when last shared ptr that refers to Logger with specified name is destroyed.

View File

@ -73,26 +73,20 @@ public:
{
TryResult() = default;
explicit TryResult(Entry entry_)
: entry(std::move(entry_))
, is_usable(true)
, is_up_to_date(true)
{
}
void reset()
{
entry = Entry();
is_usable = false;
is_up_to_date = false;
staleness = 0.0;
delay = 0;
}
Entry entry;
bool is_usable = false; /// If false, the entry is unusable for current request
/// (but may be usable for other requests, so error counts are not incremented)
bool is_up_to_date = false; /// If true, the entry is a connection to up-to-date replica.
double staleness = 0.0; /// Helps choosing the "least stale" option when all replicas are stale.
Entry entry; /// use isNull() to check if connection is established
bool is_usable = false; /// if connection is established, then can be false only with table check
/// if table is not present on remote peer, -> it'll be false
bool is_up_to_date = false; /// If true, the entry is a connection to up-to-date replica
/// Depends on max_replica_delay_for_distributed_queries setting
UInt32 delay = 0; /// Helps choosing the "least stale" option when all replicas are stale.
};
struct PoolState;
@ -101,7 +95,7 @@ public:
struct ShuffledPool
{
NestedPool * pool{};
NestedPoolPtr pool{};
const PoolState * state{}; // WARNING: valid only during initial ordering, dangling
size_t index = 0;
size_t error_count = 0;
@ -110,7 +104,7 @@ public:
/// This functor must be provided by a client. It must perform a single try that takes a connection
/// from the provided pool and checks that it is good.
using TryGetEntryFunc = std::function<TryResult(NestedPool & pool, std::string & fail_message)>;
using TryGetEntryFunc = std::function<TryResult(const NestedPoolPtr & pool, std::string & fail_message)>;
/// The client can provide this functor to affect load balancing - the index of a pool is passed to
/// this functor. The pools with lower result value will be tried first.
@ -181,7 +175,7 @@ PoolWithFailoverBase<TNestedPool>::getShuffledPools(
std::vector<ShuffledPool> shuffled_pools;
shuffled_pools.reserve(nested_pools.size());
for (size_t i = 0; i < nested_pools.size(); ++i)
shuffled_pools.push_back(ShuffledPool{nested_pools[i].get(), &pool_states[i], i, /* error_count = */ 0, /* slowdown_count = */ 0});
shuffled_pools.push_back(ShuffledPool{nested_pools[i], &pool_states[i], i, /* error_count = */ 0, /* slowdown_count = */ 0});
::sort(
shuffled_pools.begin(), shuffled_pools.end(),
@ -267,7 +261,7 @@ PoolWithFailoverBase<TNestedPool>::getMany(
continue;
std::string fail_message;
result = try_get_entry(*shuffled_pool.pool, fail_message);
result = try_get_entry(shuffled_pool.pool, fail_message);
if (!fail_message.empty())
fail_messages += fail_message + '\n';
@ -309,8 +303,8 @@ PoolWithFailoverBase<TNestedPool>::getMany(
try_results.begin(), try_results.end(),
[](const TryResult & left, const TryResult & right)
{
return std::forward_as_tuple(!left.is_up_to_date, left.staleness)
< std::forward_as_tuple(!right.is_up_to_date, right.staleness);
return std::forward_as_tuple(!left.is_up_to_date, left.delay)
< std::forward_as_tuple(!right.is_up_to_date, right.delay);
});
if (fallback_to_stale_replicas)

View File

@ -9,7 +9,6 @@
#include <Poco/NullChannel.h>
#include <Poco/StreamChannel.h>
#include <sstream>
#include <thread>
TEST(Logger, Log)
@ -101,75 +100,3 @@ TEST(Logger, SideEffects)
LOG_TRACE(log, "test no throw {}", getLogMessageParamOrThrow());
}
TEST(Logger, SharedRawLogger)
{
{
std::ostringstream stream; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
auto stream_channel = Poco::AutoPtr<Poco::StreamChannel>(new Poco::StreamChannel(stream));
auto shared_logger = getLogger("Logger_1");
shared_logger->setChannel(stream_channel.get());
shared_logger->setLevel("trace");
LOG_TRACE(shared_logger, "SharedLogger1Log1");
LOG_TRACE(getRawLogger("Logger_1"), "RawLogger1Log");
LOG_TRACE(shared_logger, "SharedLogger1Log2");
auto actual = stream.str();
EXPECT_EQ(actual, "SharedLogger1Log1\nRawLogger1Log\nSharedLogger1Log2\n");
}
{
std::ostringstream stream; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
auto stream_channel = Poco::AutoPtr<Poco::StreamChannel>(new Poco::StreamChannel(stream));
auto * raw_logger = getRawLogger("Logger_2");
raw_logger->setChannel(stream_channel.get());
raw_logger->setLevel("trace");
LOG_TRACE(getLogger("Logger_2"), "SharedLogger2Log1");
LOG_TRACE(raw_logger, "RawLogger2Log");
LOG_TRACE(getLogger("Logger_2"), "SharedLogger2Log2");
auto actual = stream.str();
EXPECT_EQ(actual, "SharedLogger2Log1\nRawLogger2Log\nSharedLogger2Log2\n");
}
}
TEST(Logger, SharedLoggersThreadSafety)
{
static size_t threads_count = std::thread::hardware_concurrency();
static constexpr size_t loggers_count = 10;
static constexpr size_t logger_get_count = 1000;
Poco::Logger::root();
std::vector<std::string> names;
Poco::Logger::names(names);
size_t loggers_size_before = names.size();
std::vector<std::thread> threads;
for (size_t thread_index = 0; thread_index < threads_count; ++thread_index)
{
threads.emplace_back([]()
{
for (size_t logger_index = 0; logger_index < loggers_count; ++logger_index)
{
for (size_t iteration = 0; iteration < logger_get_count; ++iteration)
{
getLogger("Logger_" + std::to_string(logger_index));
}
}
});
}
for (auto & thread : threads)
thread.join();
Poco::Logger::names(names);
size_t loggers_size_after = names.size();
EXPECT_EQ(loggers_size_before, loggers_size_after);
}

View File

@ -750,7 +750,12 @@ class IColumn;
M(Seconds, wait_for_async_insert_timeout, DBMS_DEFAULT_LOCK_ACQUIRE_TIMEOUT_SEC, "Timeout for waiting for processing asynchronous insertion", 0) \
M(UInt64, async_insert_max_data_size, 1000000, "Maximum size in bytes of unparsed data collected per query before being inserted", 0) \
M(UInt64, async_insert_max_query_number, 450, "Maximum number of insert queries before being inserted", 0) \
M(Milliseconds, async_insert_busy_timeout_ms, 200, "Maximum time to wait before dumping collected data per query since the first data appeared", 0) \
M(Milliseconds, async_insert_poll_timeout_ms, 10, "Timeout for polling data from asynchronous insert queue", 0) \
M(Bool, async_insert_use_adaptive_busy_timeout, true, "If it is set to true, use adaptive busy timeout for asynchronous inserts", 0) \
M(Milliseconds, async_insert_busy_timeout_min_ms, 50, "If auto-adjusting is enabled through async_insert_use_adaptive_busy_timeout, minimum time to wait before dumping collected data per query since the first data appeared. It also serves as the initial value for the adaptive algorithm", 0) \
M(Milliseconds, async_insert_busy_timeout_max_ms, 200, "Maximum time to wait before dumping collected data per query since the first data appeared.", 0) ALIAS(async_insert_busy_timeout_ms) \
M(Double, async_insert_busy_timeout_increase_rate, 0.2, "The exponential growth rate at which the adaptive asynchronous insert timeout increases", 0) \
M(Double, async_insert_busy_timeout_decrease_rate, 0.2, "The exponential growth rate at which the adaptive asynchronous insert timeout decreases", 0) \
\
M(UInt64, remote_fs_read_max_backoff_ms, 10000, "Max wait time when trying to read data for remote disk", 0) \
M(UInt64, remote_fs_read_backoff_max_tries, 5, "Max attempts to read with backoff", 0) \

View File

@ -84,6 +84,12 @@ namespace SettingsChangesHistory
/// It's used to implement `compatibility` setting (see https://github.com/ClickHouse/ClickHouse/issues/35972)
static std::map<ClickHouseVersion, SettingsChangesHistory::SettingsChanges> settings_changes_history =
{
{"24.2", {{"async_insert_poll_timeout_ms", 10, 10, "Timeout in milliseconds for polling data from asynchronous insert queue"},
{"async_insert_use_adaptive_busy_timeout", true, true, "Use adaptive asynchronous insert timeout"},
{"async_insert_busy_timeout_min_ms", 50, 50, "The minimum value of the asynchronous insert timeout in milliseconds; it also serves as the initial value, which may be increased later by the adaptive algorithm"},
{"async_insert_busy_timeout_max_ms", 200, 200, "The minimum value of the asynchronous insert timeout in milliseconds; async_insert_busy_timeout_ms is aliased to async_insert_busy_timeout_max_ms"},
{"async_insert_busy_timeout_increase_rate", 0.2, 0.2, "The exponential growth rate at which the adaptive asynchronous insert timeout increases"},
{"async_insert_busy_timeout_decrease_rate", 0.2, 0.2, "The exponential growth rate at which the adaptive asynchronous insert timeout decreases"}}},
{"24.1", {{"print_pretty_type_names", false, true, "Better user experience."},
{"input_format_json_read_bools_as_strings", false, true, "Allow to read bools as strings in JSON formats by default"},
{"output_format_arrow_use_signed_indexes_for_dictionary", false, true, "Use signed indexes type for Arrow dictionaries by default as it's recommended"},

View File

@ -102,7 +102,7 @@ void checkS3Capabilities(
if (s3_capabilities.support_batch_delete && !checkBatchRemove(storage, key_with_trailing_slash))
{
LOG_WARNING(
getLogger("S3ObjectStorage"),
&Poco::Logger::get("S3ObjectStorage"),
"Storage for disk {} does not support batch delete operations, "
"so `s3_capabilities.support_batch_delete` was automatically turned off during the access check. "
"To remove this message set `s3_capabilities.support_batch_delete` for the disk to `false`.",

View File

@ -82,7 +82,7 @@ WebObjectStorage::loadFiles(const String & path, const std::unique_lock<std::sha
if (!inserted)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Loading data for {} more than once", file_path);
LOG_TRACE(getLogger("DiskWeb"), "Adding file: {}, size: {}", file_path, size);
LOG_TRACE(&Poco::Logger::get("DiskWeb"), "Adding file: {}, size: {}", file_path, size);
loaded_files.emplace_back(file_path);
}

View File

@ -32,8 +32,7 @@ ColumnsDescription AsynchronousInsertLogElement::getColumnsDescription()
{"Preprocessed", static_cast<Int8>(DataKind::Preprocessed)},
});
return ColumnsDescription
{
return ColumnsDescription{
{"hostname", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())},
{"event_date", std::make_shared<DataTypeDate>()},
{"event_time", std::make_shared<DataTypeDateTime>()},
@ -53,6 +52,7 @@ ColumnsDescription AsynchronousInsertLogElement::getColumnsDescription()
{"flush_time", std::make_shared<DataTypeDateTime>()},
{"flush_time_microseconds", std::make_shared<DataTypeDateTime64>(6)},
{"flush_query_id", std::make_shared<DataTypeString>()},
{"timeout_milliseconds", std::make_shared<DataTypeUInt64>()},
};
}
@ -80,6 +80,7 @@ void AsynchronousInsertLogElement::appendToBlock(MutableColumns & columns) const
columns[i++]->insert(flush_time);
columns[i++]->insert(flush_time_microseconds);
columns[i++]->insert(flush_query_id);
columns[i++]->insert(timeout_milliseconds);
}
}

View File

@ -38,6 +38,7 @@ struct AsynchronousInsertLogElement
time_t flush_time{};
Decimal64 flush_time_microseconds{};
String flush_query_id;
UInt64 timeout_milliseconds = 0;
static std::string name() { return "AsynchronousInsertLog"; }
static ColumnsDescription getColumnsDescription();

View File

@ -33,13 +33,14 @@
#include <Common/SipHash.h>
#include <Common/logger_useful.h>
namespace CurrentMetrics
{
extern const Metric PendingAsyncInsert;
extern const Metric AsynchronousInsertThreads;
extern const Metric AsynchronousInsertThreadsActive;
extern const Metric AsynchronousInsertThreadsScheduled;
extern const Metric AsynchronousInsertQueueSize;
extern const Metric AsynchronousInsertQueueBytes;
}
namespace ProfileEvents
@ -60,6 +61,7 @@ namespace ErrorCodes
extern const int UNKNOWN_FORMAT;
extern const int BAD_ARGUMENTS;
extern const int LOGICAL_ERROR;
extern const int INVALID_SETTING_VALUE;
}
static const NameSet settings_to_skip
@ -171,16 +173,41 @@ void AsynchronousInsertQueue::InsertData::Entry::finish(std::exception_ptr excep
}
}
AsynchronousInsertQueue::QueueShardFlushTimeHistory::TimePoints
AsynchronousInsertQueue::QueueShardFlushTimeHistory::getRecentTimePoints() const
{
std::shared_lock lock(mutex);
return time_points;
}
void AsynchronousInsertQueue::QueueShardFlushTimeHistory::updateWithCurrentTime()
{
std::unique_lock lock(mutex);
time_points.first = time_points.second;
time_points.second = std::chrono::steady_clock::now();
}
AsynchronousInsertQueue::AsynchronousInsertQueue(ContextPtr context_, size_t pool_size_, bool flush_on_shutdown_)
: WithContext(context_)
, pool_size(pool_size_)
, flush_on_shutdown(flush_on_shutdown_)
, queue_shards(pool_size)
, pool(CurrentMetrics::AsynchronousInsertThreads, CurrentMetrics::AsynchronousInsertThreadsActive, CurrentMetrics::AsynchronousInsertThreadsScheduled, pool_size)
, flush_time_history_per_queue_shard(pool_size)
, pool(
CurrentMetrics::AsynchronousInsertThreads,
CurrentMetrics::AsynchronousInsertThreadsActive,
CurrentMetrics::AsynchronousInsertThreadsScheduled,
pool_size)
{
if (!pool_size)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "pool_size cannot be zero");
const auto & settings = getContext()->getSettingsRef();
for (size_t i = 0; i < pool_size; ++i)
queue_shards[i].busy_timeout_ms
= std::min(Milliseconds(settings.async_insert_busy_timeout_min_ms), Milliseconds(settings.async_insert_busy_timeout_max_ms));
for (size_t i = 0; i < pool_size; ++i)
dump_by_first_update_threads.emplace_back([this, i] { processBatchDeadlines(i); });
}
@ -201,7 +228,7 @@ AsynchronousInsertQueue::~AsynchronousInsertQueue()
if (flush_on_shutdown)
{
for (auto & [_, elem] : shard.queue)
scheduleDataProcessingJob(elem.key, std::move(elem.data), getContext());
scheduleDataProcessingJob(elem.key, std::move(elem.data), getContext(), i);
}
else
{
@ -217,14 +244,14 @@ AsynchronousInsertQueue::~AsynchronousInsertQueue()
LOG_TRACE(log, "Asynchronous insertion queue finished");
}
void AsynchronousInsertQueue::scheduleDataProcessingJob(const InsertQuery & key, InsertDataPtr data, ContextPtr global_context)
void AsynchronousInsertQueue::scheduleDataProcessingJob(
const InsertQuery & key, InsertDataPtr data, ContextPtr global_context, size_t shard_num)
{
/// Wrap 'unique_ptr' with 'shared_ptr' to make this
/// lambda copyable and allow to save it to the thread pool.
pool.scheduleOrThrowOnError([key, global_context, my_data = std::make_shared<InsertDataPtr>(std::move(data))]() mutable
{
processData(key, std::move(*my_data), std::move(global_context));
});
pool.scheduleOrThrowOnError(
[this, key, global_context, shard_num, my_data = std::make_shared<InsertDataPtr>(std::move(data))]() mutable
{ processData(key, std::move(*my_data), std::move(global_context), flush_time_history_per_queue_shard[shard_num]); });
}
void AsynchronousInsertQueue::preprocessInsertQuery(const ASTPtr & query, const ContextPtr & query_context)
@ -300,6 +327,7 @@ AsynchronousInsertQueue::PushResult
AsynchronousInsertQueue::pushDataChunk(ASTPtr query, DataChunk chunk, ContextPtr query_context)
{
const auto & settings = query_context->getSettingsRef();
validateSettings(settings, log);
auto & insert_query = query->as<ASTInsertQuery &>();
auto data_kind = chunk.getDataKind();
@ -319,23 +347,22 @@ AsynchronousInsertQueue::pushDataChunk(ASTPtr query, DataChunk chunk, ContextPtr
auto shard_num = key.hash % pool_size;
auto & shard = queue_shards[shard_num];
const auto flush_time_points = flush_time_history_per_queue_shard[shard_num].getRecentTimePoints();
{
std::lock_guard lock(shard.mutex);
auto [it, inserted] = shard.iterators.try_emplace(key.hash);
auto now = std::chrono::steady_clock::now();
auto timeout_ms = getBusyWaitTimeoutMs(settings, shard, shard_num, flush_time_points, now);
if (inserted)
{
auto now = std::chrono::steady_clock::now();
auto timeout = now + Milliseconds{key.settings.async_insert_busy_timeout_ms};
it->second = shard.queue.emplace(timeout, Container{key, std::make_unique<InsertData>()}).first;
}
it->second = shard.queue.emplace(now + timeout_ms, Container{key, std::make_unique<InsertData>(timeout_ms)}).first;
auto queue_it = it->second;
auto & data = queue_it->second.data;
size_t entry_data_size = entry->chunk.byteSize();
assert(data);
auto size_in_bytes = data->size_in_bytes;
data->size_in_bytes += entry_data_size;
data->entries.emplace_back(entry);
insert_future = entry->getFuture();
@ -346,23 +373,50 @@ AsynchronousInsertQueue::pushDataChunk(ASTPtr query, DataChunk chunk, ContextPtr
bool has_enough_bytes = data->size_in_bytes >= key.settings.async_insert_max_data_size;
bool has_enough_queries = data->entries.size() >= key.settings.async_insert_max_query_number && key.settings.async_insert_deduplicate;
/// Here we check whether we hit the limit on maximum data size in the buffer.
/// And use setting from query context.
/// It works, because queries with the same set of settings are already grouped together.
if (!flush_stopped && (has_enough_bytes || has_enough_queries))
auto max_busy_timeout_exceeded = [&shard, &settings, &now, &flush_time_points]() -> bool
{
if (!settings.async_insert_use_adaptive_busy_timeout || !shard.last_insert_time || !flush_time_points.first)
return false;
auto max_ms = Milliseconds(settings.async_insert_busy_timeout_max_ms);
return *shard.last_insert_time + max_ms < now && *flush_time_points.first + max_ms < *flush_time_points.second;
};
/// Here we check whether we have hit the limit on the maximum data size in the buffer or
/// if the elapsed time since the last insert exceeds the maximum busy wait timeout.
/// We also use the limit settings from the query context.
/// This works because queries with the same set of settings are already grouped together.
if (!flush_stopped && (has_enough_bytes || has_enough_queries || max_busy_timeout_exceeded()))
{
data->timeout_ms = Milliseconds::zero();
data_to_process = std::move(data);
shard.iterators.erase(it);
shard.queue.erase(queue_it);
}
shard.last_insert_time = now;
shard.busy_timeout_ms = timeout_ms;
CurrentMetrics::add(CurrentMetrics::PendingAsyncInsert);
ProfileEvents::increment(ProfileEvents::AsyncInsertQuery);
ProfileEvents::increment(ProfileEvents::AsyncInsertBytes, entry_data_size);
if (data_to_process)
{
if (!inserted)
CurrentMetrics::sub(CurrentMetrics::AsynchronousInsertQueueSize);
CurrentMetrics::sub(CurrentMetrics::AsynchronousInsertQueueBytes, size_in_bytes);
}
else
{
if (inserted)
CurrentMetrics::add(CurrentMetrics::AsynchronousInsertQueueSize);
CurrentMetrics::add(CurrentMetrics::AsynchronousInsertQueueBytes, entry_data_size);
}
}
if (data_to_process)
scheduleDataProcessingJob(key, std::move(data_to_process), getContext());
scheduleDataProcessingJob(key, std::move(data_to_process), getContext(), shard_num);
else
shard.are_tasks_available.notify_one();
@ -374,6 +428,98 @@ AsynchronousInsertQueue::pushDataChunk(ASTPtr query, DataChunk chunk, ContextPtr
};
}
AsynchronousInsertQueue::Milliseconds AsynchronousInsertQueue::getBusyWaitTimeoutMs(
const Settings & settings,
const QueueShard & shard,
size_t shard_num,
const QueueShardFlushTimeHistory::TimePoints & flush_time_points,
std::chrono::steady_clock::time_point now) const
{
if (!settings.async_insert_use_adaptive_busy_timeout)
return settings.async_insert_busy_timeout_max_ms;
const auto max_ms = Milliseconds(settings.async_insert_busy_timeout_max_ms);
const auto min_ms = std::min(std::max(Milliseconds(settings.async_insert_busy_timeout_min_ms), Milliseconds(1)), max_ms);
auto normalize = [&min_ms, &max_ms](const auto & t_ms) { return std::min(std::max(t_ms, min_ms), max_ms); };
if (!shard.last_insert_time || !flush_time_points.first)
return normalize(shard.busy_timeout_ms);
const auto & last_insert_time = *shard.last_insert_time;
const auto & [t1, t2] = std::tie(*flush_time_points.first, *flush_time_points.second);
const double increase_rate = settings.async_insert_busy_timeout_increase_rate;
const double decrease_rate = settings.async_insert_busy_timeout_decrease_rate;
const auto decreased_timeout_ms = std::min(
std::chrono::duration_cast<Milliseconds>(shard.busy_timeout_ms / (1.0 + decrease_rate)), shard.busy_timeout_ms - Milliseconds(1));
/// Increase the timeout for frequent inserts.
if (last_insert_time + min_ms > now)
{
auto timeout_ms = std::max(
std::chrono::duration_cast<Milliseconds>(shard.busy_timeout_ms * (1.0 + increase_rate)),
shard.busy_timeout_ms + Milliseconds(1));
if (timeout_ms != shard.busy_timeout_ms)
LOG_TRACE(
log,
"Async timeout increased from {} to {} for queue shard {}.",
shard.busy_timeout_ms.count(),
timeout_ms.count(),
shard_num);
return normalize(timeout_ms);
}
/// Decrease the timeout if inserts are not frequent,
/// that is, if the time since the last insert and the difference between the last two queue flushes were both
/// long enough (exceeding the adjusted timeout).
/// This ensures the timeout value converges to the minimum over time for non-frequent inserts.
else if (last_insert_time + decreased_timeout_ms < now && t1 + decreased_timeout_ms < t2)
{
auto timeout_ms = decreased_timeout_ms;
if (timeout_ms != shard.busy_timeout_ms)
LOG_TRACE(
log,
"Async timeout decreased from {} to {} for queue shard {}.",
shard.busy_timeout_ms.count(),
timeout_ms.count(),
shard_num);
return normalize(timeout_ms);
}
return normalize(shard.busy_timeout_ms);
}
void AsynchronousInsertQueue::validateSettings(const Settings & settings, LoggerPtr log)
{
const auto max_ms = std::chrono::milliseconds(settings.async_insert_busy_timeout_max_ms);
if (max_ms == std::chrono::milliseconds::zero())
throw Exception(ErrorCodes::INVALID_SETTING_VALUE, "Setting 'async_insert_busy_timeout_max_ms' can't be zero");
if (!settings.async_insert_use_adaptive_busy_timeout)
return;
/// Adaptive timeout settings.
const auto min_ms = std::chrono::milliseconds(settings.async_insert_busy_timeout_min_ms);
if (min_ms > max_ms)
if (log)
LOG_WARNING(
log,
"Setting 'async_insert_busy_timeout_min_ms'={} is greater than 'async_insert_busy_timeout_max_ms'={}. Ignoring "
"'async_insert_busy_timeout_min_ms'",
min_ms.count(),
max_ms.count());
if (settings.async_insert_busy_timeout_increase_rate <= 0)
throw Exception(ErrorCodes::INVALID_SETTING_VALUE, "Setting 'async_insert_busy_timeout_increase_rate' must be greater than zero");
if (settings.async_insert_busy_timeout_decrease_rate <= 0)
throw Exception(ErrorCodes::INVALID_SETTING_VALUE, "Setting 'async_insert_busy_timeout_decrease_rate' must be greater than zero");
}
void AsynchronousInsertQueue::flushAll()
{
std::lock_guard flush_lock(flush_mutex);
@ -395,14 +541,15 @@ void AsynchronousInsertQueue::flushAll()
size_t total_bytes = 0;
size_t total_entries = 0;
for (auto & queue : queues_to_flush)
for (size_t i = 0; i < pool_size; ++i)
{
auto & queue = queues_to_flush[i];
total_queries += queue.size();
for (auto & [_, entry] : queue)
{
total_bytes += entry.data->size_in_bytes;
total_entries += entry.data->entries.size();
scheduleDataProcessingJob(entry.key, std::move(entry.data), getContext());
scheduleDataProcessingJob(entry.key, std::move(entry.data), getContext(), i);
}
}
@ -429,17 +576,21 @@ void AsynchronousInsertQueue::processBatchDeadlines(size_t shard_num)
{
std::unique_lock lock(shard.mutex);
shard.are_tasks_available.wait_for(lock,
Milliseconds(getContext()->getSettingsRef().async_insert_busy_timeout_ms), [&shard, this]
{
if (shutdown)
return true;
const auto rel_time
= std::min(shard.busy_timeout_ms, Milliseconds(getContext()->getSettingsRef().async_insert_poll_timeout_ms));
shard.are_tasks_available.wait_for(
lock,
rel_time,
[&shard, this]
{
if (shutdown)
return true;
if (!shard.queue.empty() && shard.queue.begin()->first < std::chrono::steady_clock::now())
return true;
if (!shard.queue.empty() && shard.queue.begin()->first < std::chrono::steady_clock::now())
return true;
return false;
});
return false;
});
if (shutdown)
return;
@ -449,21 +600,30 @@ void AsynchronousInsertQueue::processBatchDeadlines(size_t shard_num)
const auto now = std::chrono::steady_clock::now();
size_t size_in_bytes = 0;
while (true)
{
if (shard.queue.empty() || shard.queue.begin()->first > now)
break;
auto it = shard.queue.begin();
size_in_bytes += it->second.data->size_in_bytes;
shard.iterators.erase(it->second.key.hash);
entries_to_flush.emplace_back(std::move(it->second));
shard.queue.erase(it);
}
if (!entries_to_flush.empty())
{
CurrentMetrics::sub(CurrentMetrics::AsynchronousInsertQueueSize, entries_to_flush.size());
CurrentMetrics::sub(CurrentMetrics::AsynchronousInsertQueueBytes, size_in_bytes);
}
}
for (auto & entry : entries_to_flush)
scheduleDataProcessingJob(entry.key, std::move(entry.data), getContext());
scheduleDataProcessingJob(entry.key, std::move(entry.data), getContext(), shard_num);
}
}
@ -507,7 +667,8 @@ String serializeQuery(const IAST & query, size_t max_length)
}
// static
void AsynchronousInsertQueue::processData(InsertQuery key, InsertDataPtr data, ContextPtr global_context)
void AsynchronousInsertQueue::processData(
InsertQuery key, InsertDataPtr data, ContextPtr global_context, QueueShardFlushTimeHistory & queue_shard_flush_time_history)
try
{
if (!data)
@ -613,9 +774,12 @@ try
throw;
}
auto add_entry_to_log = [&](
const auto & entry, const auto & entry_query_for_logging,
const auto & exception, size_t num_rows, size_t num_bytes)
auto add_entry_to_log = [&](const auto & entry,
const auto & entry_query_for_logging,
const auto & exception,
size_t num_rows,
size_t num_bytes,
Milliseconds timeout_ms)
{
if (!async_insert_log)
return;
@ -632,6 +796,7 @@ try
elem.rows = num_rows;
elem.exception = exception;
elem.data_kind = entry->chunk.getDataKind();
elem.timeout_milliseconds = timeout_ms.count();
/// If there was a parsing error,
/// the entry won't be flushed anyway,
@ -666,9 +831,9 @@ try
auto header = pipeline.getHeader();
if (key.data_kind == DataKind::Parsed)
chunk = processEntriesWithParsing(key, data->entries, header, insert_context, log, add_entry_to_log);
chunk = processEntriesWithParsing(key, data, header, insert_context, log, add_entry_to_log);
else
chunk = processPreprocessedEntries(key, data->entries, header, insert_context, add_entry_to_log);
chunk = processPreprocessedEntries(key, data, header, insert_context, add_entry_to_log);
ProfileEvents::increment(ProfileEvents::AsyncInsertRows, chunk.getNumRows());
@ -691,6 +856,8 @@ try
LOG_INFO(log, "Flushed {} rows, {} bytes for query '{}'", num_rows, num_bytes, key.query_str);
queue_shard_flush_time_history.updateWithCurrentTime();
bool pulling_pipeline = false;
logQueryFinish(query_log_elem, insert_context, key.query, pipeline, pulling_pipeline, query_span, QueryCache::Usage::None, internal);
}
@ -729,7 +896,7 @@ catch (...)
template <typename LogFunc>
Chunk AsynchronousInsertQueue::processEntriesWithParsing(
const InsertQuery & key,
const std::list<InsertData::EntryPtr> & entries,
const InsertDataPtr & data,
const Block & header,
const ContextPtr & insert_context,
const LoggerPtr logger,
@ -770,7 +937,7 @@ Chunk AsynchronousInsertQueue::processEntriesWithParsing(
auto chunk_info = std::make_shared<AsyncInsertInfo>();
auto query_for_logging = serializeQuery(*key.query, insert_context->getSettingsRef().log_queries_cut_to_length);
for (const auto & entry : entries)
for (const auto & entry : data->entries)
{
current_entry = entry;
@ -786,7 +953,7 @@ Chunk AsynchronousInsertQueue::processEntriesWithParsing(
chunk_info->offsets.push_back(total_rows);
chunk_info->tokens.push_back(entry->async_dedup_token);
add_to_async_insert_log(entry, query_for_logging, current_exception, num_rows, num_bytes);
add_to_async_insert_log(entry, query_for_logging, current_exception, num_rows, num_bytes, data->timeout_ms);
current_exception.clear();
}
@ -798,7 +965,7 @@ Chunk AsynchronousInsertQueue::processEntriesWithParsing(
template <typename LogFunc>
Chunk AsynchronousInsertQueue::processPreprocessedEntries(
const InsertQuery & key,
const std::list<InsertData::EntryPtr> & entries,
const InsertDataPtr & data,
const Block & header,
const ContextPtr & insert_context,
LogFunc && add_to_async_insert_log)
@ -821,7 +988,7 @@ Chunk AsynchronousInsertQueue::processPreprocessedEntries(
return it->second;
};
for (const auto & entry : entries)
for (const auto & entry : data->entries)
{
const auto * block = entry->chunk.asBlock();
if (!block)
@ -837,7 +1004,7 @@ Chunk AsynchronousInsertQueue::processPreprocessedEntries(
chunk_info->tokens.push_back(entry->async_dedup_token);
const auto & query_for_logging = get_query_by_format(entry->format);
add_to_async_insert_log(entry, query_for_logging, "", block->rows(), block->bytes());
add_to_async_insert_log(entry, query_for_logging, "", block->rows(), block->bytes(), data->timeout_ms);
}
Chunk chunk(std::move(result_columns), total_rows);

View File

@ -10,6 +10,7 @@
#include <Processors/Chunk.h>
#include <future>
#include <shared_mutex>
#include <variant>
namespace DB
@ -53,6 +54,8 @@ public:
Preprocessed = 1,
};
static void validateSettings(const Settings & settings, LoggerPtr log);
/// Force flush the whole queue.
void flushAll();
@ -146,6 +149,9 @@ private:
std::atomic_bool finished = false;
};
InsertData() = default;
explicit InsertData(Milliseconds timeout_ms_) : timeout_ms(timeout_ms_) { }
~InsertData()
{
auto it = entries.begin();
@ -163,6 +169,7 @@ private:
std::list<EntryPtr> entries;
size_t size_in_bytes = 0;
Milliseconds timeout_ms = Milliseconds::zero();
};
using InsertDataPtr = std::unique_ptr<InsertData>;
@ -180,6 +187,8 @@ private:
using QueueIterator = Queue::iterator;
using QueueIteratorByKey = std::unordered_map<UInt128, QueueIterator>;
using OptionalTimePoint = std::optional<std::chrono::steady_clock::time_point>;
struct QueueShard
{
mutable std::mutex mutex;
@ -187,12 +196,30 @@ private:
Queue queue;
QueueIteratorByKey iterators;
OptionalTimePoint last_insert_time;
std::chrono::milliseconds busy_timeout_ms;
};
/// Times of the two most recent queue flushes.
/// Used to calculate adaptive timeout.
struct QueueShardFlushTimeHistory
{
public:
using TimePoints = std::pair<OptionalTimePoint, OptionalTimePoint>;
TimePoints getRecentTimePoints() const;
void updateWithCurrentTime();
private:
mutable std::shared_mutex mutex;
TimePoints time_points;
};
const size_t pool_size;
const bool flush_on_shutdown;
std::vector<QueueShard> queue_shards;
std::vector<QueueShardFlushTimeHistory> flush_time_history_per_queue_shard;
/// Logic and events behind queue are as follows:
/// - async_insert_busy_timeout_ms:
@ -217,17 +244,26 @@ private:
LoggerPtr log = getLogger("AsynchronousInsertQueue");
PushResult pushDataChunk(ASTPtr query, DataChunk chunk, ContextPtr query_context);
Milliseconds getBusyWaitTimeoutMs(
const Settings & settings,
const QueueShard & shard,
size_t shard_num,
const QueueShardFlushTimeHistory::TimePoints & flush_time_points,
std::chrono::steady_clock::time_point now) const;
void preprocessInsertQuery(const ASTPtr & query, const ContextPtr & query_context);
void processBatchDeadlines(size_t shard_num);
void scheduleDataProcessingJob(const InsertQuery & key, InsertDataPtr data, ContextPtr global_context);
void scheduleDataProcessingJob(const InsertQuery & key, InsertDataPtr data, ContextPtr global_context, size_t shard_num);
static void processData(InsertQuery key, InsertDataPtr data, ContextPtr global_context);
static void processData(
InsertQuery key, InsertDataPtr data, ContextPtr global_context, QueueShardFlushTimeHistory & queue_shard_flush_time_history);
template <typename LogFunc>
static Chunk processEntriesWithParsing(
const InsertQuery & key,
const std::list<InsertData::EntryPtr> & entries,
const InsertDataPtr & data,
const Block & header,
const ContextPtr & insert_context,
const LoggerPtr logger,
@ -236,7 +272,7 @@ private:
template <typename LogFunc>
static Chunk processPreprocessedEntries(
const InsertQuery & key,
const std::list<InsertData::EntryPtr> & entries,
const InsertDataPtr & data,
const Block & header,
const ContextPtr & insert_context,
LogFunc && add_to_async_insert_log);

View File

@ -4862,10 +4862,10 @@ AsynchronousInsertQueue * Context::getAsynchronousInsertQueue() const
void Context::setAsynchronousInsertQueue(const std::shared_ptr<AsynchronousInsertQueue> & ptr)
{
using namespace std::chrono;
AsynchronousInsertQueue::validateSettings(settings, getLogger("Context"));
if (std::chrono::milliseconds(settings.async_insert_busy_timeout_ms) == 0ms)
throw Exception(ErrorCodes::INVALID_SETTING_VALUE, "Setting async_insert_busy_timeout_ms can't be zero");
if (std::chrono::milliseconds(settings.async_insert_poll_timeout_ms) == std::chrono::milliseconds::zero())
throw Exception(ErrorCodes::INVALID_SETTING_VALUE, "Setting async_insert_poll_timeout_ms can't be zero");
shared->async_insert_queue = ptr;
}

View File

@ -99,7 +99,7 @@ static void dumpMemoryTracker(ProfileEventsSnapshot const & snapshot, DB::Mutabl
}
void getProfileEvents(
const String & server_display_name,
const String & host_name,
DB::InternalProfileEventsQueuePtr profile_queue,
DB::Block & block,
ThreadIdToCountersSnapshot & last_sent_snapshots)
@ -139,8 +139,8 @@ void getProfileEvents(
}
last_sent_snapshots = std::move(new_snapshots);
dumpProfileEvents(group_snapshot, columns, server_display_name);
dumpMemoryTracker(group_snapshot, columns, server_display_name);
dumpProfileEvents(group_snapshot, columns, host_name);
dumpMemoryTracker(group_snapshot, columns, host_name);
Block curr_block;

View File

@ -26,7 +26,7 @@ using ThreadIdToCountersSnapshot = std::unordered_map<UInt64, Counters::Snapshot
void dumpToMapColumn(const Counters::Snapshot & counters, DB::IColumn * column, bool nonzero_only = true);
void getProfileEvents(
const String & server_display_name,
const String & host_name,
DB::InternalProfileEventsQueuePtr profile_queue,
DB::Block & block,
ThreadIdToCountersSnapshot & last_sent_snapshots);

View File

@ -5,7 +5,7 @@
#include <Processors/ISource.h>
#include <Interpreters/Context_fwd.h>
#include <Parsers/IAST_fwd.h>
#include <Storages/MergeTree/ZooKeeperRetries.h>
#include <Common/ZooKeeper/ZooKeeperRetries.h>
namespace zkutil

View File

@ -178,11 +178,11 @@ void ReadFromRemote::addLazyPipe(Pipes & pipes, const ClusterProxy::SelectStream
throw;
}
double max_remote_delay = 0.0;
UInt32 max_remote_delay = 0;
for (const auto & try_result : try_results)
{
if (!try_result.is_up_to_date)
max_remote_delay = std::max(try_result.staleness, max_remote_delay);
max_remote_delay = std::max(try_result.delay, max_remote_delay);
}
if (try_results.empty() || local_delay < max_remote_delay)

View File

@ -64,9 +64,14 @@ RemoteQueryExecutor::RemoteQueryExecutor(
RemoteQueryExecutor::RemoteQueryExecutor(
Connection & connection,
const String & query_, const Block & header_, ContextPtr context_,
ThrottlerPtr throttler, const Scalars & scalars_, const Tables & external_tables_,
QueryProcessingStage::Enum stage_, std::optional<Extension> extension_)
const String & query_,
const Block & header_,
ContextPtr context_,
ThrottlerPtr throttler,
const Scalars & scalars_,
const Tables & external_tables_,
QueryProcessingStage::Enum stage_,
std::optional<Extension> extension_)
: RemoteQueryExecutor(query_, header_, context_, scalars_, external_tables_, stage_, extension_)
{
create_connections = [this, &connection, throttler, extension_](AsyncCallback)
@ -80,9 +85,14 @@ RemoteQueryExecutor::RemoteQueryExecutor(
RemoteQueryExecutor::RemoteQueryExecutor(
std::shared_ptr<Connection> connection_ptr,
const String & query_, const Block & header_, ContextPtr context_,
ThrottlerPtr throttler, const Scalars & scalars_, const Tables & external_tables_,
QueryProcessingStage::Enum stage_, std::optional<Extension> extension_)
const String & query_,
const Block & header_,
ContextPtr context_,
ThrottlerPtr throttler,
const Scalars & scalars_,
const Tables & external_tables_,
QueryProcessingStage::Enum stage_,
std::optional<Extension> extension_)
: RemoteQueryExecutor(query_, header_, context_, scalars_, external_tables_, stage_, extension_)
{
create_connections = [this, connection_ptr, throttler, extension_](AsyncCallback)
@ -96,12 +106,18 @@ RemoteQueryExecutor::RemoteQueryExecutor(
RemoteQueryExecutor::RemoteQueryExecutor(
std::vector<IConnectionPool::Entry> && connections_,
const String & query_, const Block & header_, ContextPtr context_,
const ThrottlerPtr & throttler, const Scalars & scalars_, const Tables & external_tables_,
QueryProcessingStage::Enum stage_, std::optional<Extension> extension_)
const String & query_,
const Block & header_,
ContextPtr context_,
const ThrottlerPtr & throttler,
const Scalars & scalars_,
const Tables & external_tables_,
QueryProcessingStage::Enum stage_,
std::optional<Extension> extension_)
: RemoteQueryExecutor(query_, header_, context_, scalars_, external_tables_, stage_, extension_)
{
create_connections = [this, connections_, throttler, extension_](AsyncCallback) mutable {
create_connections = [this, connections_, throttler, extension_](AsyncCallback) mutable
{
auto res = std::make_unique<MultiplexedConnections>(std::move(connections_), context->getSettingsRef(), throttler);
if (extension_ && extension_->replica_info)
res->setReplicaInfo(*extension_->replica_info);

View File

@ -50,29 +50,43 @@ public:
std::shared_ptr<TaskIterator> task_iterator = nullptr;
std::shared_ptr<ParallelReplicasReadingCoordinator> parallel_reading_coordinator = nullptr;
std::optional<IConnections::ReplicaInfo> replica_info = {};
GetPriorityForLoadBalancing::Func priority_func;
};
/// Takes already set connection.
RemoteQueryExecutor(
Connection & connection,
const String & query_, const Block & header_, ContextPtr context_,
ThrottlerPtr throttler_ = nullptr, const Scalars & scalars_ = Scalars(), const Tables & external_tables_ = Tables(),
QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete, std::optional<Extension> extension_ = std::nullopt);
const String & query_,
const Block & header_,
ContextPtr context_,
ThrottlerPtr throttler_ = nullptr,
const Scalars & scalars_ = Scalars(),
const Tables & external_tables_ = Tables(),
QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete,
std::optional<Extension> extension_ = std::nullopt);
/// Takes already set connection.
RemoteQueryExecutor(
std::shared_ptr<Connection> connection,
const String & query_, const Block & header_, ContextPtr context_,
ThrottlerPtr throttler_ = nullptr, const Scalars & scalars_ = Scalars(), const Tables & external_tables_ = Tables(),
QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete, std::optional<Extension> extension_ = std::nullopt);
const String & query_,
const Block & header_,
ContextPtr context_,
ThrottlerPtr throttler_ = nullptr,
const Scalars & scalars_ = Scalars(),
const Tables & external_tables_ = Tables(),
QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete,
std::optional<Extension> extension_ = std::nullopt);
/// Accepts several connections already taken from pool.
RemoteQueryExecutor(
std::vector<IConnectionPool::Entry> && connections_,
const String & query_, const Block & header_, ContextPtr context_,
const ThrottlerPtr & throttler = nullptr, const Scalars & scalars_ = Scalars(), const Tables & external_tables_ = Tables(),
QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete, std::optional<Extension> extension_ = std::nullopt);
const String & query_,
const Block & header_,
ContextPtr context_,
const ThrottlerPtr & throttler = nullptr,
const Scalars & scalars_ = Scalars(),
const Tables & external_tables_ = Tables(),
QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete,
std::optional<Extension> extension_ = std::nullopt);
/// Takes a pool and gets one or several connections from it.
RemoteQueryExecutor(

View File

@ -184,7 +184,15 @@ void validateClientInfo(const ClientInfo & session_client_info, const ClientInfo
namespace DB
{
TCPHandler::TCPHandler(IServer & server_, TCPServer & tcp_server_, const Poco::Net::StreamSocket & socket_, bool parse_proxy_protocol_, std::string server_display_name_, const ProfileEvents::Event & read_event_, const ProfileEvents::Event & write_event_)
TCPHandler::TCPHandler(
IServer & server_,
TCPServer & tcp_server_,
const Poco::Net::StreamSocket & socket_,
bool parse_proxy_protocol_,
std::string server_display_name_,
std::string host_name_,
const ProfileEvents::Event & read_event_,
const ProfileEvents::Event & write_event_)
: Poco::Net::TCPServerConnection(socket_)
, server(server_)
, tcp_server(tcp_server_)
@ -193,11 +201,20 @@ TCPHandler::TCPHandler(IServer & server_, TCPServer & tcp_server_, const Poco::N
, read_event(read_event_)
, write_event(write_event_)
, server_display_name(std::move(server_display_name_))
, host_name(std::move(host_name_))
{
}
TCPHandler::TCPHandler(IServer & server_, TCPServer & tcp_server_, const Poco::Net::StreamSocket & socket_, TCPProtocolStackData & stack_data, std::string server_display_name_, const ProfileEvents::Event & read_event_, const ProfileEvents::Event & write_event_)
: Poco::Net::TCPServerConnection(socket_)
TCPHandler::TCPHandler(
IServer & server_,
TCPServer & tcp_server_,
const Poco::Net::StreamSocket & socket_,
TCPProtocolStackData & stack_data,
std::string server_display_name_,
std::string host_name_,
const ProfileEvents::Event & read_event_,
const ProfileEvents::Event & write_event_)
: Poco::Net::TCPServerConnection(socket_)
, server(server_)
, tcp_server(tcp_server_)
, log(getLogger("TCPHandler"))
@ -207,6 +224,7 @@ TCPHandler::TCPHandler(IServer & server_, TCPServer & tcp_server_, const Poco::N
, write_event(write_event_)
, default_database(stack_data.default_database)
, server_display_name(std::move(server_display_name_))
, host_name(std::move(host_name_))
{
if (!forwarded_for.empty())
LOG_TRACE(log, "Forwarded client address: {}", forwarded_for);
@ -1201,7 +1219,7 @@ void TCPHandler::sendExtremes(const Block & extremes)
void TCPHandler::sendProfileEvents()
{
Block block;
ProfileEvents::getProfileEvents(server_display_name, state.profile_queue, block, last_sent_snapshots);
ProfileEvents::getProfileEvents(host_name, state.profile_queue, block, last_sent_snapshots);
if (block.rows() != 0)
{
initProfileEventsBlockOutput(block);

View File

@ -147,8 +147,24 @@ public:
* because it allows to check the IP ranges of the trusted proxy.
* Proxy-forwarded (original client) IP address is used for quota accounting if quota is keyed by forwarded IP.
*/
TCPHandler(IServer & server_, TCPServer & tcp_server_, const Poco::Net::StreamSocket & socket_, bool parse_proxy_protocol_, std::string server_display_name_, const ProfileEvents::Event & read_event_ = ProfileEvents::end(), const ProfileEvents::Event & write_event_ = ProfileEvents::end());
TCPHandler(IServer & server_, TCPServer & tcp_server_, const Poco::Net::StreamSocket & socket_, TCPProtocolStackData & stack_data, std::string server_display_name_, const ProfileEvents::Event & read_event_ = ProfileEvents::end(), const ProfileEvents::Event & write_event_ = ProfileEvents::end());
TCPHandler(
IServer & server_,
TCPServer & tcp_server_,
const Poco::Net::StreamSocket & socket_,
bool parse_proxy_protocol_,
String server_display_name_,
String host_name_,
const ProfileEvents::Event & read_event_ = ProfileEvents::end(),
const ProfileEvents::Event & write_event_ = ProfileEvents::end());
TCPHandler(
IServer & server_,
TCPServer & tcp_server_,
const Poco::Net::StreamSocket & socket_,
TCPProtocolStackData & stack_data,
String server_display_name_,
String host_name_,
const ProfileEvents::Event & read_event_ = ProfileEvents::end(),
const ProfileEvents::Event & write_event_ = ProfileEvents::end());
~TCPHandler() override;
void run() override;
@ -225,6 +241,7 @@ private:
/// It is the name of the server that will be sent to the client.
String server_display_name;
String host_name;
void runImpl();

View File

@ -19,6 +19,7 @@ private:
IServer & server;
bool parse_proxy_protocol = false;
LoggerPtr log;
std::string host_name;
std::string server_display_name;
ProfileEvents::Event read_event;
@ -42,7 +43,8 @@ public:
, read_event(read_event_)
, write_event(write_event_)
{
server_display_name = server.config().getString("display_name", getFQDNOrHostName());
host_name = getFQDNOrHostName();
server_display_name = server.config().getString("display_name", host_name);
}
Poco::Net::TCPServerConnection * createConnection(const Poco::Net::StreamSocket & socket, TCPServer & tcp_server) override
@ -50,7 +52,7 @@ public:
try
{
LOG_TRACE(log, "TCP Request. Address: {}", socket.peerAddress().toString());
return new TCPHandler(server, tcp_server, socket, parse_proxy_protocol, server_display_name, read_event, write_event);
return new TCPHandler(server, tcp_server, socket, parse_proxy_protocol, server_display_name, host_name, read_event, write_event);
}
catch (const Poco::Net::NetException &)
{
@ -64,7 +66,7 @@ public:
try
{
LOG_TRACE(log, "TCP Request. Address: {}", socket.peerAddress().toString());
return new TCPHandler(server, tcp_server, socket, stack_data, server_display_name, read_event, write_event);
return new TCPHandler(server, tcp_server, socket, stack_data, server_display_name, host_name, read_event, write_event);
}
catch (const Poco::Net::NetException &)
{

View File

@ -720,9 +720,12 @@ void IMergeTreeDataPart::loadColumnsChecksumsIndexes(bool require_columns_checks
}
catch (...)
{
/// Don't scare people with broken part error
if (!isRetryableException(std::current_exception()))
LOG_ERROR(storage.log, "Part {} is broken and need manual correction", getDataPartStorage().getFullPath());
// There could be conditions that data part to be loaded is broken, but some of meta infos are already written
// into meta data before exception, need to clean them all.
LOG_ERROR(storage.log, "Part {} is broken and need manual correction", getDataPartStorage().getFullPath());
metadata_manager->deleteAll(/*include_projection*/ true);
metadata_manager->assertAllDeleted(/*include_projection*/ true);
throw;

View File

@ -180,7 +180,7 @@ std::pair<MergeTreeData::MutableDataPartPtr, scope_guard> cloneSourcePart(
}
LOG_DEBUG(
getLogger("MergeTreeDataPartCloner"),
&Poco::Logger::get("MergeTreeDataPartCloner"),
"Clone {} part {} to {}{}",
src_flushed_tmp_part ? "flushed" : "",
src_part_storage->getFullPath(),

View File

@ -3,7 +3,7 @@
#include <Processors/Sinks/SinkToStorage.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <base/types.h>
#include <Storages/MergeTree/ZooKeeperRetries.h>
#include <Common/ZooKeeper/ZooKeeperRetries.h>
#include <Common/ZooKeeper/ZooKeeperWithFaultInjection.h>
#include <Storages/MergeTree/AsyncBlockIDsCache.h>

View File

@ -12,6 +12,8 @@
#include <Interpreters/ProcessList.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/ZooKeeper/ZooKeeperRetries.h>
#include <Common/ZooKeeper/ZooKeeperWithFaultInjection.h>
#include <Common/typeid_cast.h>
#include <Columns/ColumnSet.h>
#include <Columns/ColumnConst.h>
@ -426,7 +428,30 @@ void ReadFromSystemZooKeeper::applyFilters()
void ReadFromSystemZooKeeper::fillData(MutableColumns & res_columns)
{
zkutil::ZooKeeperPtr zookeeper = context->getZooKeeper();
QueryStatusPtr query_status = context->getProcessListElement();
const auto & settings = context->getSettingsRef();
/// Use insert settings for now in order not to introduce new settings.
/// Hopefully insert settings will also be unified and replaced with some generic retry settings.
ZooKeeperRetriesInfo retries_seetings(
settings.insert_keeper_max_retries,
settings.insert_keeper_retry_initial_backoff_ms,
settings.insert_keeper_retry_max_backoff_ms);
ZooKeeperWithFaultInjection::Ptr zookeeper;
/// Handles reconnects when needed
auto get_zookeeper = [&] ()
{
if (!zookeeper || zookeeper->expired())
{
zookeeper = ZooKeeperWithFaultInjection::createInstance(
settings.insert_keeper_fault_injection_probability,
settings.insert_keeper_fault_injection_seed,
context->getZooKeeper(),
"", nullptr);
}
return zookeeper;
};
if (paths.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS,
@ -448,6 +473,9 @@ void ReadFromSystemZooKeeper::fillData(MutableColumns & res_columns)
std::unordered_set<String> added;
while (!paths.empty())
{
if (query_status)
query_status->checkTimeLimit();
list_tasks.clear();
std::vector<String> paths_to_list;
while (!paths.empty() && static_cast<Int64>(list_tasks.size()) < max_inflight_requests)
@ -470,7 +498,10 @@ void ReadFromSystemZooKeeper::fillData(MutableColumns & res_columns)
paths_to_list.emplace_back(task.path_corrected);
list_tasks.emplace_back(std::move(task));
}
auto list_responses = zookeeper->tryGetChildren(paths_to_list);
zkutil::ZooKeeper::MultiTryGetChildrenResponse list_responses;
ZooKeeperRetriesControl("", nullptr, retries_seetings, query_status).retryLoop(
[&]() { list_responses = get_zookeeper()->tryGetChildren(paths_to_list); });
struct GetTask
{
@ -514,7 +545,9 @@ void ReadFromSystemZooKeeper::fillData(MutableColumns & res_columns)
}
}
auto get_responses = zookeeper->tryGet(paths_to_get);
zkutil::ZooKeeper::MultiTryGetResponse get_responses;
ZooKeeperRetriesControl("", nullptr, retries_seetings, query_status).retryLoop(
[&]() { get_responses = get_zookeeper()->tryGet(paths_to_get); });
for (size_t i = 0, size = get_tasks.size(); i < size; ++i)
{

File diff suppressed because it is too large Load Diff

View File

@ -11,6 +11,10 @@ from integration_test_images import IMAGES
class Labels(metaclass=WithIter):
"""
Label names or commit tokens in normalized form
"""
DO_NOT_TEST_LABEL = "do_not_test"
NO_MERGE_COMMIT = "no_merge_commit"
NO_CI_CACHE = "no_ci_cache"
@ -111,7 +115,6 @@ class JobNames(metaclass=WithIter):
PERFORMANCE_TEST_AMD64 = "Performance Comparison"
PERFORMANCE_TEST_ARM64 = "Performance Comparison Aarch64"
SQL_LANCER_TEST = "SQLancer (release)"
SQL_LOGIC_TEST = "Sqllogic test (release)"
SQLANCER = "SQLancer (release)"
@ -132,6 +135,8 @@ class JobNames(metaclass=WithIter):
DOCS_CHECK = "Docs check"
BUGFIX_VALIDATE = "tests bugfix validate check"
MARK_RELEASE_READY = "Mark Commit Release Ready"
# dynamically update JobName with Build jobs
for attr_name in dir(Build):
@ -156,7 +161,7 @@ class DigestConfig:
@dataclass
class LabelConfig:
"""
class to configure different CI scenarious per GH label or commit message token
configures different CI scenarious per GH label
"""
run_jobs: Iterable[str] = frozenset()
@ -165,19 +170,26 @@ class LabelConfig:
@dataclass
class JobConfig:
"""
contains config parameter relevant for job execution in CI workflow
@digest - configures digest calculation for the job
@run_command - will be triggered for the job if omited in CI workflow yml
@timeout
@num_batches - sets number of batches for multi-batch job
contains config parameters for job execution in CI workflow
"""
# configures digest calculation for the job
digest: DigestConfig = field(default_factory=DigestConfig)
# will be triggered for the job if omited in CI workflow yml
run_command: str = ""
# job timeout
timeout: Optional[int] = None
# sets number of batches for multi-batch job
num_batches: int = 1
# label that enables job in CI, if set digest won't be used
run_by_label: str = ""
# to run always regardless of the job digest or/and label
run_always: bool = False
# if the job needs to be run on the release branch, including master (e.g. building packages, docker server).
# NOTE: Subsequent runs on the same branch with the similar digest are still considered skippable.
required_on_release_branch: bool = False
# job is for pr workflow only
pr_only: bool = False
@dataclass
@ -194,6 +206,7 @@ class BuildConfig:
static_binary_name: str = ""
job_config: JobConfig = field(
default_factory=lambda: JobConfig(
required_on_release_branch=True,
digest=DigestConfig(
include_paths=[
"./src",
@ -614,6 +627,8 @@ CI_CONFIG = CiConfig(
"tsan",
"msan",
"ubsan",
# skip build report jobs as not all builds will be done
"build check",
)
]
)
@ -780,15 +795,19 @@ CI_CONFIG = CiConfig(
),
},
other_jobs_configs={
JobNames.MARK_RELEASE_READY: TestConfig(
"", job_config=JobConfig(required_on_release_branch=True)
),
JobNames.DOCKER_SERVER: TestConfig(
"",
job_config=JobConfig(
required_on_release_branch=True,
digest=DigestConfig(
include_paths=[
"tests/ci/docker_server.py",
"./docker/server",
]
)
),
),
),
JobNames.DOCKER_KEEPER: TestConfig(
@ -799,7 +818,7 @@ CI_CONFIG = CiConfig(
"tests/ci/docker_server.py",
"./docker/keeper",
]
)
),
),
),
JobNames.DOCS_CHECK: TestConfig(
@ -814,11 +833,12 @@ CI_CONFIG = CiConfig(
JobNames.FAST_TEST: TestConfig(
"",
job_config=JobConfig(
pr_only=True,
digest=DigestConfig(
include_paths=["./tests/queries/0_stateless/"],
exclude_files=[".md"],
docker=["clickhouse/fasttest"],
)
),
),
),
JobNames.STYLE_CHECK: TestConfig(
@ -988,11 +1008,15 @@ CI_CONFIG = CiConfig(
),
JobNames.COMPATIBILITY_TEST: TestConfig(
Build.PACKAGE_RELEASE,
job_config=JobConfig(digest=compatibility_check_digest),
job_config=JobConfig(
required_on_release_branch=True, digest=compatibility_check_digest
),
),
JobNames.COMPATIBILITY_TEST_ARM: TestConfig(
Build.PACKAGE_AARCH64,
job_config=JobConfig(digest=compatibility_check_digest),
job_config=JobConfig(
required_on_release_branch=True, digest=compatibility_check_digest
),
),
JobNames.UNIT_TEST: TestConfig(
Build.BINARY_RELEASE, job_config=JobConfig(**unit_test_common_params) # type: ignore

View File

@ -1,6 +1,6 @@
from contextlib import contextmanager
import os
from typing import Union, Iterator
from typing import List, Union, Iterator
from pathlib import Path
@ -17,3 +17,21 @@ def cd(path: Union[Path, str]) -> Iterator[None]:
yield
finally:
os.chdir(oldpwd)
def is_hex(s):
try:
int(s, 16)
return True
except ValueError:
return False
class GHActions:
@staticmethod
def print_in_group(group_name: str, lines: Union[str, List[str]]) -> None:
lines = list(lines)
print(f"::group::{group_name}")
for line in lines:
print(line)
print("::endgroup::")

View File

@ -350,7 +350,7 @@ class CommitStatusData:
return cls.load_from_file(STATUS_FILE_PATH)
@classmethod
def is_present(cls) -> bool:
def exist(cls) -> bool:
return STATUS_FILE_PATH.is_file()
def dump_status(self) -> None:

View File

@ -29,7 +29,7 @@ from tee_popen import TeePopen
from clickhouse_helper import get_instance_type, get_instance_id
from stopwatch import Stopwatch
from build_download_helper import download_builds_filter
from report import JobReport
from report import SUCCESS, JobReport
IMAGE_NAME = "clickhouse/performance-comparison"
@ -223,7 +223,7 @@ def main():
message = message_match.group(1).strip()
# TODO: Remove me, always green mode for the first time, unless errors
status = "success"
status = SUCCESS
if "errors" in message.lower() or too_many_slow(message.lower()):
status = "failure"
# TODO: Remove until here
@ -249,7 +249,7 @@ def main():
check_name=check_name_with_group,
).dump()
if status == "error":
if status != SUCCESS:
sys.exit(1)

View File

@ -287,7 +287,10 @@ class PRInfo:
self.fetch_changed_files()
def is_master(self) -> bool:
return self.number == 0 and self.base_ref == "master"
return self.number == 0 and self.head_ref == "master"
def is_release_branch(self) -> bool:
return self.number == 0
def is_scheduled(self):
return self.event_type == EventType.SCHEDULE

View File

@ -107,6 +107,9 @@ class S3Helper:
logging.info("Upload %s to %s. Meta: %s", file_path, url, metadata)
return url
def delete_file_from_s3(self, bucket_name: str, s3_path: str) -> None:
self.client.delete_object(Bucket=bucket_name, Key=s3_path)
def upload_test_report_to_s3(self, file_path: Path, s3_path: str) -> str:
if CI:
return self._upload_file_to_s3(S3_TEST_REPORTS_BUCKET, file_path, s3_path)

293
tests/ci/test_ci_cache.py Normal file
View File

@ -0,0 +1,293 @@
#!/usr/bin/env python
from hashlib import md5
from pathlib import Path
import shutil
from typing import Dict, Set
import unittest
from ci_config import Build, JobNames
from s3_helper import S3Helper
from ci import CiCache
from digest_helper import JOB_DIGEST_LEN
from commit_status_helper import CommitStatusData
from env_helper import S3_BUILDS_BUCKET, TEMP_PATH
def _create_mock_digest_1(string):
return md5((string).encode("utf-8")).hexdigest()[:JOB_DIGEST_LEN]
def _create_mock_digest_2(string):
return md5((string + "+nonce").encode("utf-8")).hexdigest()[:JOB_DIGEST_LEN]
DIGESTS = {job: _create_mock_digest_1(job) for job in JobNames}
DIGESTS2 = {job: _create_mock_digest_2(job) for job in JobNames}
# pylint:disable=protected-access
class S3HelperTestMock(S3Helper):
def __init__(self) -> None:
super().__init__()
self.files_on_s3_paths = {} # type: Dict[str, Set[str]]
# local path which is mocking remote s3 path with ci_cache
self.mock_remote_s3_path = Path(TEMP_PATH) / "mock_s3_path"
if not self.mock_remote_s3_path.exists():
self.mock_remote_s3_path.mkdir(parents=True, exist_ok=True)
for file in self.mock_remote_s3_path.iterdir():
file.unlink()
def list_prefix(self, s3_prefix_path, bucket=S3_BUILDS_BUCKET):
assert bucket == S3_BUILDS_BUCKET
file_prefix = Path(s3_prefix_path).name
path = str(Path(s3_prefix_path).parent)
return [
path + "/" + file
for file in self.files_on_s3_paths[path]
if file.startswith(file_prefix)
]
def upload_file(self, bucket, file_path, s3_path):
assert bucket == S3_BUILDS_BUCKET
file_name = Path(file_path).name
assert (
file_name in s3_path
), f"Record file name [{file_name}] must be part of a path on s3 [{s3_path}]"
s3_path = str(Path(s3_path).parent)
if s3_path in self.files_on_s3_paths:
self.files_on_s3_paths[s3_path].add(file_name)
else:
self.files_on_s3_paths[s3_path] = set([file_name])
shutil.copy(file_path, self.mock_remote_s3_path)
def download_files(self, bucket, s3_path, file_suffix, local_directory):
assert bucket == S3_BUILDS_BUCKET
assert file_suffix == CiCache._RECORD_FILE_EXTENSION
assert local_directory == CiCache._LOCAL_CACHE_PATH
assert CiCache._S3_CACHE_PREFIX in s3_path
assert [job_type.value in s3_path for job_type in CiCache.JobType]
# copying from mock remote path to local cache
for remote_record in self.mock_remote_s3_path.glob(f"*{file_suffix}"):
destination_file = CiCache._LOCAL_CACHE_PATH / remote_record.name
shutil.copy(remote_record, destination_file)
# pylint:disable=protected-access
class TestCiCache(unittest.TestCase):
def test_cache(self):
s3_mock = S3HelperTestMock()
ci_cache = CiCache(s3_mock, DIGESTS)
# immitate another CI run is using cache
ci_cache_2 = CiCache(s3_mock, DIGESTS2)
NUM_BATCHES = 10
DOCS_JOBS_NUM = 1
assert len(set(job for job in JobNames)) == len(list(job for job in JobNames))
NONDOCS_JOBS_NUM = len(set(job for job in JobNames)) - DOCS_JOBS_NUM
PR_NUM = 123456
status = CommitStatusData(
status="success",
report_url="dummy url",
description="OK OK OK",
sha="deadbeaf2",
pr_num=PR_NUM,
)
### add some pending statuses for two batches and on non-release branch
for job in JobNames:
ci_cache.push_pending(job, [0, 1], NUM_BATCHES, release_branch=False)
ci_cache_2.push_pending(job, [0, 1], NUM_BATCHES, release_branch=False)
### add success status for 0 batch, non-release branch
for job in JobNames:
ci_cache.push_successful(job, 0, NUM_BATCHES, status, release_branch=False)
ci_cache_2.push_successful(
job, 0, NUM_BATCHES, status, release_branch=False
)
### check all expected directories were created on s3 mock
expected_build_path_1 = f"{CiCache.JobType.SRCS.value}-{_create_mock_digest_1(Build.PACKAGE_RELEASE)}"
expected_docs_path_1 = (
f"{CiCache.JobType.DOCS.value}-{_create_mock_digest_1(JobNames.DOCS_CHECK)}"
)
expected_build_path_2 = f"{CiCache.JobType.SRCS.value}-{_create_mock_digest_2(Build.PACKAGE_RELEASE)}"
expected_docs_path_2 = (
f"{CiCache.JobType.DOCS.value}-{_create_mock_digest_2(JobNames.DOCS_CHECK)}"
)
self.assertCountEqual(
list(s3_mock.files_on_s3_paths.keys()),
[
f"{CiCache._S3_CACHE_PREFIX}/{expected_build_path_1}",
f"{CiCache._S3_CACHE_PREFIX}/{expected_docs_path_1}",
f"{CiCache._S3_CACHE_PREFIX}/{expected_build_path_2}",
f"{CiCache._S3_CACHE_PREFIX}/{expected_docs_path_2}",
],
)
### check number of cache files is as expected
FILES_PER_JOB = 3 # 1 successful + 2 pending batches = 3
self.assertEqual(
len(
s3_mock.files_on_s3_paths[
f"{CiCache._S3_CACHE_PREFIX}/{expected_build_path_1}"
]
),
NONDOCS_JOBS_NUM * FILES_PER_JOB,
)
self.assertEqual(
len(
s3_mock.files_on_s3_paths[
f"{CiCache._S3_CACHE_PREFIX}/{expected_docs_path_1}"
]
),
DOCS_JOBS_NUM * FILES_PER_JOB,
)
self.assertEqual(
len(
s3_mock.files_on_s3_paths[
f"{CiCache._S3_CACHE_PREFIX}/{expected_build_path_2}"
]
),
NONDOCS_JOBS_NUM * FILES_PER_JOB,
)
self.assertEqual(
len(
s3_mock.files_on_s3_paths[
f"{CiCache._S3_CACHE_PREFIX}/{expected_docs_path_2}"
]
),
DOCS_JOBS_NUM * FILES_PER_JOB,
)
### check statuses for all jobs in cache
for job in JobNames:
self.assertEqual(
ci_cache.is_successful(job, 0, NUM_BATCHES, release_branch=False), True
)
self.assertEqual(
ci_cache.is_successful(job, 0, NUM_BATCHES, release_branch=True), False
)
self.assertEqual(
ci_cache.is_successful(
job, batch=1, num_batches=NUM_BATCHES, release_branch=False
),
False,
) # false - it's pending
self.assertEqual(
ci_cache.is_successful(
job,
batch=NUM_BATCHES,
num_batches=NUM_BATCHES,
release_branch=False,
),
False,
) # false - no such record
self.assertEqual(
ci_cache.is_pending(job, 0, NUM_BATCHES, release_branch=False), False
) # false, it's successful, success has more priority than pending
self.assertEqual(
ci_cache.is_pending(job, 1, NUM_BATCHES, release_branch=False), True
) # true
self.assertEqual(
ci_cache.is_pending(job, 1, NUM_BATCHES, release_branch=True), False
) # false, not pending job on release_branch
status2 = ci_cache.get_successful(job, 0, NUM_BATCHES)
assert status2 and status2.pr_num == PR_NUM
status2 = ci_cache.get_successful(job, 1, NUM_BATCHES)
assert status2 is None
### add some more pending statuses for two batches and for a release branch
for job in JobNames:
ci_cache.push_pending(
job, batches=[0, 1], num_batches=NUM_BATCHES, release_branch=True
)
### add success statuses for 0 batch and release branch
PR_NUM = 234
status = CommitStatusData(
status="success",
report_url="dummy url",
description="OK OK OK",
sha="deadbeaf2",
pr_num=PR_NUM,
)
for job in JobNames:
ci_cache.push_successful(job, 0, NUM_BATCHES, status, release_branch=True)
### check number of cache files is as expected
FILES_PER_JOB = 6 # 1 successful + 1 successful_release + 2 pending batches + 2 pending batches release = 6
self.assertEqual(
len(
s3_mock.files_on_s3_paths[
f"{CiCache._S3_CACHE_PREFIX}/{expected_build_path_1}"
]
),
NONDOCS_JOBS_NUM * FILES_PER_JOB,
)
self.assertEqual(
len(
s3_mock.files_on_s3_paths[
f"{CiCache._S3_CACHE_PREFIX}/{expected_docs_path_1}"
]
),
DOCS_JOBS_NUM * FILES_PER_JOB,
)
### check statuses
for job in JobNames:
self.assertEqual(ci_cache.is_successful(job, 0, NUM_BATCHES, False), True)
self.assertEqual(ci_cache.is_successful(job, 0, NUM_BATCHES, True), True)
self.assertEqual(ci_cache.is_successful(job, 1, NUM_BATCHES, False), False)
self.assertEqual(ci_cache.is_successful(job, 1, NUM_BATCHES, True), False)
self.assertEqual(
ci_cache.is_pending(job, 0, NUM_BATCHES, False), False
) # it's success, not pending
self.assertEqual(
ci_cache.is_pending(job, 0, NUM_BATCHES, True), False
) # it's success, not pending
self.assertEqual(ci_cache.is_pending(job, 1, NUM_BATCHES, False), True)
self.assertEqual(ci_cache.is_pending(job, 1, NUM_BATCHES, True), True)
status2 = ci_cache.get_successful(job, 0, NUM_BATCHES)
assert status2 and status2.pr_num == PR_NUM
status2 = ci_cache.get_successful(job, 1, NUM_BATCHES)
assert status2 is None
### create new cache object and verify the same checks
ci_cache = CiCache(s3_mock, DIGESTS)
for job in JobNames:
self.assertEqual(ci_cache.is_successful(job, 0, NUM_BATCHES, False), True)
self.assertEqual(ci_cache.is_successful(job, 0, NUM_BATCHES, True), True)
self.assertEqual(ci_cache.is_successful(job, 1, NUM_BATCHES, False), False)
self.assertEqual(ci_cache.is_successful(job, 1, NUM_BATCHES, True), False)
self.assertEqual(
ci_cache.is_pending(job, 0, NUM_BATCHES, False), False
) # it's success, not pending
self.assertEqual(
ci_cache.is_pending(job, 0, NUM_BATCHES, True), False
) # it's success, not pending
self.assertEqual(ci_cache.is_pending(job, 1, NUM_BATCHES, False), True)
self.assertEqual(ci_cache.is_pending(job, 1, NUM_BATCHES, True), True)
status2 = ci_cache.get_successful(job, 0, NUM_BATCHES)
assert status2 and status2.pr_num == PR_NUM
status2 = ci_cache.get_successful(job, 1, NUM_BATCHES)
assert status2 is None
### check some job values which are not in the cache
self.assertEqual(ci_cache.is_successful(job, 0, NUM_BATCHES + 1, False), False)
self.assertEqual(
ci_cache.is_successful(job, NUM_BATCHES - 1, NUM_BATCHES, False), False
)
self.assertEqual(ci_cache.is_pending(job, 0, NUM_BATCHES + 1, False), False)
self.assertEqual(
ci_cache.is_pending(job, NUM_BATCHES - 1, NUM_BATCHES, False), False
)
if __name__ == "__main__":
TestCiCache().test_cache()

View File

@ -0,0 +1,14 @@
<clickhouse>
<profiles>
<default>
<async_insert_use_adaptive_busy_timeout>1</async_insert_use_adaptive_busy_timeout>
</default>
</profiles>
<users>
<default>
<password></password>
<profile>default</profile>
</default>
</users>
</clickhouse>

View File

@ -0,0 +1,8 @@
<clickhouse>
<zookeeper>
<node index="1">
<host>zoo1</host>
<port>2181</port>
</node>
</zookeeper>
</clickhouse>

View File

@ -0,0 +1,372 @@
import copy
import logging
import pytest
import random
import timeit
from math import floor
from multiprocessing import Pool
from itertools import repeat
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance(
"node",
main_configs=["configs/zookeeper_config.xml"],
user_configs=[
"configs/users.xml",
],
with_zookeeper=True,
)
@pytest.fixture(scope="module", autouse=True)
def start_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
_query_settings = {"async_insert": 1, "wait_for_async_insert": 1}
def _generate_values(size, min_int, max_int, array_size_range):
gen_tuple = lambda _min_int, _max_int, _array_size_range: (
random.randint(_min_int, _max_int),
[
random.randint(_min_int, _max_int)
for _ in range(random.randint(*_array_size_range))
],
)
return map(lambda _: gen_tuple(min_int, max_int, array_size_range), range(size))
def _insert_query(table_name, settings, *args, **kwargs):
settings_s = ", ".join("{}={}".format(k, settings[k]) for k in settings)
INSERT_QUERY = "INSERT INTO {} SETTINGS {} VALUES {}"
node.query(
INSERT_QUERY.format(
table_name,
settings_s,
", ".join(map(str, _generate_values(*args, **kwargs))),
)
)
def _insert_queries_sequentially(
table_name, settings, iterations, max_values_size, array_size_range
):
for iter in range(iterations):
_insert_query(
table_name,
settings,
random.randint(1, max_values_size),
iter * max_values_size,
(iter + 1) * max_values_size - 1,
array_size_range,
)
def _insert_queries_in_parallel(
table_name, settings, thread_num, tasks, max_values_size, array_size_range
):
sizes = [random.randint(1, max_values_size) for _ in range(tasks)]
min_ints = [iter * max_values_size for iter in range(tasks)]
max_ints = [(iter + 1) * max_values_size - 1 for iter in range(tasks)]
with Pool(thread_num) as p:
p.starmap(
_insert_query,
zip(
repeat(table_name),
repeat(settings),
sizes,
min_ints,
max_ints,
repeat(array_size_range),
),
)
def test_with_merge_tree():
table_name = "async_insert_mt_table"
node.query(
"CREATE TABLE {} (a UInt64, b Array(UInt64)) ENGINE=MergeTree() ORDER BY a".format(
table_name
)
)
_insert_queries_sequentially(
table_name,
_query_settings,
iterations=100,
max_values_size=1000,
array_size_range=[10, 50],
)
node.query("DROP TABLE IF EXISTS {}".format(table_name))
def test_with_merge_tree_multithread():
thread_num = 15
table_name = "async_insert_mt_multithread_table"
node.query(
"CREATE TABLE {} (a UInt64, b Array(UInt64)) ENGINE=MergeTree() ORDER BY a".format(
table_name
)
)
_insert_queries_in_parallel(
table_name,
_query_settings,
thread_num=15,
tasks=1000,
max_values_size=1000,
array_size_range=[10, 15],
)
node.query("DROP TABLE IF EXISTS {}".format(table_name))
def test_with_replicated_merge_tree():
table_name = "async_insert_replicated_mt_table"
create_query = " ".join(
(
"CREATE TABLE {} (a UInt64, b Array(UInt64))".format(table_name),
"ENGINE=ReplicatedMergeTree('/clickhouse/tables/test/{}', 'node')".format(
table_name
),
"ORDER BY a",
)
)
node.query(create_query)
settings = _query_settings
_insert_queries_sequentially(
table_name,
settings,
iterations=100,
max_values_size=1000,
array_size_range=[10, 50],
)
node.query("DROP TABLE IF EXISTS {}".format(table_name))
def test_with_replicated_merge_tree_multithread():
thread_num = 15
table_name = "async_insert_replicated_mt_multithread_table"
create_query = " ".join(
(
"CREATE TABLE {} (a UInt64, b Array(UInt64))".format(table_name),
"ENGINE=ReplicatedMergeTree('/clickhouse/tables/test/{}', 'node')".format(
table_name
),
"ORDER BY a",
)
)
node.query(create_query)
_insert_queries_in_parallel(
table_name,
_query_settings,
thread_num=15,
tasks=1000,
max_values_size=1000,
array_size_range=[10, 15],
)
node.query("DROP TABLE IF EXISTS {}".format(table_name))
# Ensure that the combined duration of inserts with adaptive timeouts is less than
# the combined duration for fixed timeouts.
def test_compare_sequential_inserts_durations_for_adaptive_and_fixed_async_timeouts():
fixed_tm_table_name = "async_insert_mt_fixed_async_timeout"
node.query(
"CREATE TABLE {} (a UInt64, b Array(UInt64)) ENGINE=MergeTree() ORDER BY a".format(
fixed_tm_table_name
)
)
fixed_tm_settings = copy.copy(_query_settings)
fixed_tm_settings["async_insert_use_adaptive_busy_timeout"] = 0
fixed_tm_settings["async_insert_busy_timeout_ms"] = 200
fixed_tm_run_duration = timeit.timeit(
lambda: _insert_queries_sequentially(
fixed_tm_table_name,
fixed_tm_settings,
iterations=100,
max_values_size=1000,
array_size_range=[10, 50],
),
setup="pass",
number=3,
)
node.query("DROP TABLE IF EXISTS {}".format(fixed_tm_table_name))
logging.debug(
"Run duration with fixed asynchronous timeout is {} seconds".format(
fixed_tm_run_duration
)
)
adaptive_tm_table_name = "async_insert_mt_adaptive_async_timeout"
node.query(
"CREATE TABLE {} (a UInt64, b Array(UInt64)) ENGINE=MergeTree() ORDER BY a".format(
adaptive_tm_table_name
)
)
adaptive_tm_settings = copy.copy(_query_settings)
adaptive_tm_settings["async_insert_busy_timeout_min_ms"] = 10
adaptive_tm_settings["async_insert_busy_timeout_max_ms"] = 1000
adaptive_tm_run_duration = timeit.timeit(
lambda: _insert_queries_sequentially(
adaptive_tm_table_name,
adaptive_tm_settings,
iterations=100,
max_values_size=1000,
array_size_range=[10, 50],
),
setup="pass",
number=3,
)
logging.debug(
"Run duration with adaptive asynchronous timeout is {} seconds.".format(
adaptive_tm_run_duration
)
)
node.query("DROP TABLE IF EXISTS {}".format(adaptive_tm_table_name))
assert adaptive_tm_run_duration <= fixed_tm_run_duration
# Ensure that the combined duration of inserts with adaptive timeouts is less than
# the combined duration for fixed timeouts.
def test_compare_parallel_inserts_durations_for_adaptive_and_fixed_async_timeouts():
fixed_tm_table_name = "async_insert_mt_fixed_async_timeout"
node.query(
"CREATE TABLE {} (a UInt64, b Array(UInt64)) ENGINE=MergeTree() ORDER BY a".format(
fixed_tm_table_name
)
)
fixed_tm_settings = copy.copy(_query_settings)
fixed_tm_settings["async_insert_use_adaptive_busy_timeout"] = 0
fixed_tm_settings["async_insert_busy_timeout_ms"] = 200
fixed_tm_run_duration = timeit.timeit(
lambda: _insert_queries_in_parallel(
fixed_tm_table_name,
fixed_tm_settings,
thread_num=15,
tasks=1000,
max_values_size=1000,
array_size_range=[10, 50],
),
setup="pass",
number=3,
)
node.query("DROP TABLE IF EXISTS {}".format(fixed_tm_table_name))
logging.debug(
"Run duration with fixed asynchronous timeout is {} seconds".format(
fixed_tm_run_duration
)
)
adaptive_tm_table_name = "async_insert_mt_adaptive_async_timeout"
node.query(
"CREATE TABLE {} (a UInt64, b Array(UInt64)) ENGINE=MergeTree() ORDER BY a".format(
adaptive_tm_table_name
)
)
adaptive_tm_settings = copy.copy(_query_settings)
adaptive_tm_settings["async_insert_busy_timeout_min_ms"] = 10
adaptive_tm_settings["async_insert_busy_timeout_max_ms"] = 200
adaptive_tm_run_duration = timeit.timeit(
lambda: _insert_queries_in_parallel(
adaptive_tm_table_name,
adaptive_tm_settings,
thread_num=15,
tasks=100,
max_values_size=1000,
array_size_range=[10, 50],
),
setup="pass",
number=3,
)
logging.debug(
"Run duration with adaptive asynchronous timeout is {} seconds.".format(
adaptive_tm_run_duration
)
)
node.query("DROP TABLE IF EXISTS {}".format(adaptive_tm_table_name))
assert adaptive_tm_run_duration <= fixed_tm_run_duration
# Ensure that the delay converges to a minimum for sequential inserts and wait_for_async_insert=1.
def test_change_queries_frequency():
table_name = "async_insert_mt_change_queries_frequencies"
create_query = " ".join(
(
"CREATE TABLE {} (a UInt64, b Array(UInt64))".format(table_name),
"ENGINE=ReplicatedMergeTree('/clickhouse/tables/test_frequencies/{}', 'node')".format(
table_name
),
"ORDER BY a",
)
)
node.query(create_query)
settings = copy.copy(_query_settings)
min_ms = 50
settings["async_insert_busy_timeout_min_ms"] = min_ms
settings["async_insert_busy_timeout_max_ms"] = 2000
_insert_queries_in_parallel(
table_name,
settings,
thread_num=15,
tasks=2000,
max_values_size=1000,
array_size_range=[10, 15],
)
_insert_queries_sequentially(
table_name,
settings,
iterations=200,
max_values_size=1000,
array_size_range=[10, 50],
)
select_log_query = "SELECT timeout_milliseconds FROM system.asynchronous_insert_log ORDER BY event_time DESC LIMIT 50"
res = node.query(select_log_query)
for line in res.splitlines():
assert int(line) == min_ms
node.query("DROP TABLE IF EXISTS {}".format(table_name))

View File

@ -5,7 +5,9 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
url="${CLICKHOUSE_URL}&async_insert=1&wait_for_async_insert=1&async_insert_busy_timeout_ms=600000&async_insert_max_query_number=3&async_insert_deduplicate=1"
# With adaptive timeout enabled, the asynchronous queue can be flushed synchronously, depending on the elapsed since the last insert.
# This may result in test flakiness.
url="${CLICKHOUSE_URL}&async_insert=1&wait_for_async_insert=1&async_insert_busy_timeout_ms=600000&async_insert_max_query_number=3&async_insert_deduplicate=1&async_insert_use_adaptive_busy_timeout=0"
${CLICKHOUSE_CLIENT} -q "DROP TABLE IF EXISTS async_inserts"
${CLICKHOUSE_CLIENT} -q "CREATE TABLE async_inserts (id UInt32, s String) ENGINE = MergeTree ORDER BY id"

View File

@ -4,7 +4,9 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
url="${CLICKHOUSE_URL}&async_insert=1&wait_for_async_insert=1"
# With adaptive timeout enabled, the asynchronous queue can be flushed synchronously, depending on the elapsed since the last insert.
# This may result in test flakiness.
url="${CLICKHOUSE_URL}&async_insert=1&wait_for_async_insert=1&async_insert_use_adaptive_busy_timeout=0"
${CLICKHOUSE_CLIENT} -q "DROP TABLE IF EXISTS async_inserts"
${CLICKHOUSE_CLIENT} -q "CREATE TABLE async_inserts (id UInt32, s String) ENGINE = MergeTree ORDER BY id"

View File

@ -6,7 +6,8 @@ CREATE TABLE t_async_inserts_flush (a UInt64) ENGINE = Memory;
SET async_insert = 1;
SET wait_for_async_insert = 0;
SET async_insert_busy_timeout_ms = 1000000;
SET async_insert_busy_timeout_min_ms = 1000000;
SET async_insert_busy_timeout_max_ms = 10000000;
INSERT INTO t_async_inserts_flush VALUES (1) (2);
INSERT INTO t_async_inserts_flush FORMAT JSONEachRow {"a": 10} {"a": 20};

View File

@ -7,7 +7,7 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
${CLICKHOUSE_CLIENT} -q "DROP TABLE IF EXISTS 02810_async_insert_dedup_collapsing"
${CLICKHOUSE_CLIENT} -q "CREATE TABLE 02810_async_insert_dedup_collapsing (stringvalue String, sign Int8) ENGINE = ReplicatedCollapsingMergeTree('/clickhouse/{database}/02810_async_insert_dedup', 'r1', sign) ORDER BY stringvalue"
url="${CLICKHOUSE_URL}&async_insert=1&wait_for_async_insert=1&async_insert_busy_timeout_ms=3000&async_insert_deduplicate=1"
url="${CLICKHOUSE_URL}&async_insert=1&wait_for_async_insert=1&async_insert_busy_timeout_ms=3000&async_insert_use_adaptive_busy_timeout=0&async_insert_deduplicate=1"
# insert value with same key and sign so it's collapsed on insert
${CLICKHOUSE_CURL} -sS "$url" -d "INSERT INTO 02810_async_insert_dedup_collapsing VALUES ('string1', 1)" &
@ -36,4 +36,4 @@ wait
${CLICKHOUSE_CLIENT} -q "SELECT stringvalue FROM 02810_async_insert_dedup_collapsing ORDER BY stringvalue"
${CLICKHOUSE_CLIENT} -q "SELECT '------------'"
${CLICKHOUSE_CLIENT} -q "DROP TABLE 02810_async_insert_dedup_collapsing"
${CLICKHOUSE_CLIENT} -q "DROP TABLE 02810_async_insert_dedup_collapsing"

View File

@ -12,7 +12,7 @@ $CLICKHOUSE_CLIENT -n -q "
CREATE TABLE t_async_insert_native_1 (id UInt64, s String) ENGINE = MergeTree ORDER BY id;
"
async_insert_options="--async_insert 1 --wait_for_async_insert 0 --async_insert_busy_timeout_ms 1000000"
async_insert_options="--async_insert 1 --wait_for_async_insert 0 --async_insert_busy_timeout_min_ms 1000000 --async_insert_busy_timeout_max_ms 10000000"
echo '{"id": 1, "s": "aaa"} {"id": 2, "s": "bbb"}' | $CLICKHOUSE_CLIENT $async_insert_options -q 'INSERT INTO t_async_insert_native_1 FORMAT JSONEachRow'
$CLICKHOUSE_CLIENT $async_insert_options -q 'INSERT INTO t_async_insert_native_1 FORMAT JSONEachRow {"id": 3, "s": "ccc"}'

View File

@ -12,7 +12,7 @@ $CLICKHOUSE_CLIENT -n -q "
CREATE TABLE t_async_insert_native_3 (id UInt64, s String) ENGINE = MergeTree ORDER BY id;
"
async_insert_options="--async_insert 1 --wait_for_async_insert 0 --async_insert_busy_timeout_ms 1000000"
async_insert_options="--async_insert 1 --wait_for_async_insert 0 --async_insert_busy_timeout_min_ms 1000000 --async_insert_busy_timeout_max_ms 10000000"
echo '{"id": 1, "s": "aaa"} {"id": 2, "s": "bbb"}' | $CLICKHOUSE_CLIENT $async_insert_options -q 'INSERT INTO t_async_insert_native_3 FORMAT JSONEachRow'
echo "(3, 'ccc') (4, 'ddd') (5, 'eee')" | $CLICKHOUSE_CLIENT $async_insert_options -q 'INSERT INTO t_async_insert_native_3 FORMAT Values'

View File

@ -9,7 +9,8 @@ ORDER BY id;
SET async_insert = 1;
SET async_insert_deduplicate = 1;
SET wait_for_async_insert = 0;
SET async_insert_busy_timeout_ms = 100000;
SET async_insert_busy_timeout_min_ms = 100000;
SET async_insert_busy_timeout_max_ms = 1000000;
SET insert_deduplication_token = '1';
SET log_comment = 'async_insert_skip_settings_1';

View File

@ -82,3 +82,12 @@ B 3 10
D 1 20
A 2 30
C \N 40
-- test SELECT * ORDER BY ALL with no "all" column in the SELECT clause
A 2 30
B 3 10
C \N 40
D 1 20
A 2 30
B 3 10
C \N 40
D 1 20

View File

@ -87,3 +87,23 @@ SET allow_experimental_analyzer = 1;
SELECT a, b, all FROM order_by_all ORDER BY all, a;
DROP TABLE order_by_all;
SELECT '-- test SELECT * ORDER BY ALL with no "all" column in the SELECT clause';
CREATE TABLE order_by_all
(
a String,
b Nullable(Int32),
c UInt64,
)
ENGINE = Memory;
INSERT INTO order_by_all VALUES ('B', 3, 10), ('C', NULL, 40), ('D', 1, 20), ('A', 2, 30);
SET allow_experimental_analyzer = 0;
SELECT * FROM order_by_all ORDER BY ALL;
SET allow_experimental_analyzer = 1;
SELECT * FROM order_by_all ORDER BY ALL;
DROP TABLE order_by_all;

View File

@ -0,0 +1,51 @@
DROP TABLE IF EXISTS async_insert_mt_test;
CREATE TABLE async_insert_mt_test (a UInt64, b Array(UInt64)) ENGINE=MergeTree() ORDER BY a;
SET async_insert_use_adaptive_busy_timeout = 1;
INSERT INTO async_insert_mt_test
SETTINGS
async_insert=1,
wait_for_async_insert=1,
async_insert_busy_timeout_min_ms=10,
async_insert_busy_timeout_max_ms=500,
async_insert_busy_timeout_increase_rate=1.0,
async_insert_busy_timeout_decrease_rate=1.0
VALUES (3, []), (1, [1, 3]), (2, [7, 8]), (4, [5, 9]), (5, [2, 6]);
INSERT INTO async_insert_mt_test
SETTINGS
async_insert=1,
wait_for_async_insert=1,
async_insert_busy_timeout_ms=500,
async_insert_busy_timeout_min_ms=500
VALUES (3, []), (1, [1, 3]), (2, [7, 8]), (4, [5, 9]), (5, [2, 6]);
INSERT INTO async_insert_mt_test
SETTINGS
async_insert=1,
wait_for_async_insert=1,
async_insert_busy_timeout_ms=100,
async_insert_busy_timeout_min_ms=500
VALUES (3, []), (1, [1, 3]), (2, [7, 8]), (4, [5, 9]), (5, [2, 6]);
INSERT INTO async_insert_mt_test
SETTINGS
async_insert=1,
wait_for_async_insert=1,
async_insert_busy_timeout_increase_rate=-1.0
VALUES (3, []), (1, [1, 3]), (2, [7, 8]), (4, [5, 9]), (5, [2, 6]); -- { serverError INVALID_SETTING_VALUE }
INSERT INTO async_insert_mt_test
SETTINGS
async_insert=1,
wait_for_async_insert=1,
async_insert_busy_timeout_decrease_rate=-1.0
VALUES (3, []), (1, [1, 3]), (2, [7, 8]), (4, [5, 9]), (5, [2, 6]); -- { serverError INVALID_SETTING_VALUE }
DROP TABLE IF EXISTS async_insert_mt_test;

View File

@ -0,0 +1,3 @@
/keeper api_version
/keeper feature_flags
1

View File

@ -0,0 +1,22 @@
-- Tags: zookeeper, no-parallel, no-fasttest
SELECT path, name
FROM system.zookeeper
WHERE path = '/keeper'
ORDER BY path, name
SETTINGS
insert_keeper_retry_initial_backoff_ms = 1,
insert_keeper_retry_max_backoff_ms = 20,
insert_keeper_fault_injection_probability=0.3,
insert_keeper_fault_injection_seed=4,
log_comment='02975_system_zookeeper_retries';
SYSTEM FLUSH LOGS;
-- Check that there where zk session failures
SELECT ProfileEvents['ZooKeeperHardwareExceptions'] > 0
FROM system.query_log
WHERE current_database = currentDatabase() AND type = 'QueryFinish' AND log_comment='02975_system_zookeeper_retries'
ORDER BY event_time_microseconds DESC
LIMIT 1;

View File

@ -59,7 +59,7 @@ int main(int argc, char *argv[])
Poco::Logger::root().setChannel(channel);
Poco::Logger::root().setLevel("trace");
}
auto logger = getLogger("keeper-dumper");
auto * logger = &Poco::Logger::get("keeper-dumper");
ResponsesQueue queue(std::numeric_limits<size_t>::max());
SnapshotsQueue snapshots_queue{1};
CoordinationSettingsPtr settings = std::make_shared<CoordinationSettings>();

View File

@ -1,6 +1,9 @@
v24.1.2.5-stable 2024-02-02
v24.1.1.2048-stable 2024-01-30
v23.12.3.40-stable 2024-02-02
v23.12.2.59-stable 2024-01-05
v23.12.1.1368-stable 2023-12-28
v23.11.5.29-stable 2024-02-02
v23.11.4.24-stable 2024-01-05
v23.11.3.23-stable 2023-12-21
v23.11.2.11-stable 2023-12-13

1 v24.1.1.2048-stable v24.1.2.5-stable 2024-01-30 2024-02-02
1 v24.1.2.5-stable 2024-02-02
2 v24.1.1.2048-stable v24.1.1.2048-stable 2024-01-30 2024-01-30
3 v23.12.3.40-stable 2024-02-02
4 v23.12.2.59-stable v23.12.2.59-stable 2024-01-05 2024-01-05
5 v23.12.1.1368-stable v23.12.1.1368-stable 2023-12-28 2023-12-28
6 v23.11.5.29-stable 2024-02-02
7 v23.11.4.24-stable v23.11.4.24-stable 2024-01-05 2024-01-05
8 v23.11.3.23-stable v23.11.3.23-stable 2023-12-21 2023-12-21
9 v23.11.2.11-stable v23.11.2.11-stable 2023-12-13 2023-12-13