Merge branch 'master' into bump-vectorscan-to-5.4.8

This commit is contained in:
Robert Schulze 2022-09-15 12:46:40 +02:00 committed by GitHub
commit b4a2a4d1d0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
84 changed files with 1223 additions and 452 deletions

15
.git-blame-ignore-revs Normal file
View File

@ -0,0 +1,15 @@
# This is a file that can be used by git-blame to ignore some revisions.
# (git 2.23+, released in August 2019)
#
# Can be configured as follow:
#
# $ git config blame.ignoreRevsFile .git-blame-ignore-revs
#
# For more information you can look at git-blame(1) man page.
# Changed tabs to spaces in code [#CLICKHOUSE-3]
137ad95929ee016cc6d3c03bccb5586941c163ff
# dbms/ → src/
# (though it is unlikely that you will see it in blame)
06446b4f08a142d6f1bc30664c47ded88ab51782

View File

@ -30,10 +30,11 @@ jobs:
cp -r "$GITHUB_WORKSPACE" "$TEMP_PATH"
cd "$REPO_COPY"
# Download and push packages to artifactory
python3 ./tests/ci/push_to_artifactory.py --release "${{ github.ref }}" \
--commit '${{ github.sha }}' --artifactory-url "${{ secrets.JFROG_ARTIFACTORY_URL }}" --all
python3 ./tests/ci/push_to_artifactory.py --release '${{ github.ref }}' \
--commit '${{ github.sha }}' --artifactory-url '${{ secrets.JFROG_ARTIFACTORY_URL }}' --all
# Download macos binaries to ${{runner.temp}}/download_binary
python3 ./tests/ci/download_binary.py binary_darwin binary_darwin_aarch64
python3 ./tests/ci/download_binary.py --version '${{ github.ref }}' \
--commit '${{ github.sha }}' binary_darwin binary_darwin_aarch64
mv '${{runner.temp}}/download_binary/'clickhouse-* '${{runner.temp}}/push_to_artifactory'
- name: Upload packages to release assets
uses: svenstaro/upload-release-action@v2

View File

@ -43,6 +43,7 @@ jobs:
GITHUB_TOKEN: ${{ secrets.ROBOT_CLICKHOUSE_COMMIT_TOKEN }}
run: |
./utils/list-versions/list-versions.sh > ./utils/list-versions/version_date.tsv
./utils/list-versions/update-docker-version.sh
GID=$(id -g "${UID}")
docker run -u "${UID}:${GID}" -e PYTHONUNBUFFERED=1 \
--volume="${GITHUB_WORKSPACE}:/ClickHouse" clickhouse/style-test \

View File

@ -220,6 +220,35 @@ ReplxxLineReader::ReplxxLineReader(
rx.bind_key(Replxx::KEY::control('W'), [this](char32_t code) { return rx.invoke(Replxx::ACTION::KILL_TO_WHITESPACE_ON_LEFT, code); });
rx.bind_key(Replxx::KEY::meta('E'), [this](char32_t) { openEditor(); return Replxx::ACTION_RESULT::CONTINUE; });
/// readline insert-comment
auto insert_comment_action = [this](char32_t code)
{
replxx::Replxx::State state(rx.get_state());
const char * line = state.text();
const char * line_end = line + strlen(line);
std::string commented_line;
if (std::find(line, line_end, '\n') != line_end)
{
/// If query has multiple lines, multiline comment is used over
/// commenting each line separately for easier uncomment (though
/// with invoking editor it is simpler to uncomment multiple lines)
///
/// Note, that using multiline comment is OK even with nested
/// comments, since nested comments are supported.
commented_line = fmt::format("/* {} */", state.text());
}
else
{
// In a simplest case use simple comment.
commented_line = fmt::format("-- {}", state.text());
}
rx.set_state(replxx::Replxx::State(commented_line.c_str(), commented_line.size()));
return rx.invoke(Replxx::ACTION::COMMIT_LINE, code);
};
rx.bind_key(Replxx::KEY::meta('#'), insert_comment_action);
}
ReplxxLineReader::~ReplxxLineReader()

View File

@ -33,7 +33,7 @@ RUN arch=${TARGETARCH:-amd64} \
# lts / testing / prestable / etc
ARG REPO_CHANNEL="stable"
ARG REPOSITORY="https://packages.clickhouse.com/tgz/${REPO_CHANNEL}"
ARG VERSION="20.9.3.45"
ARG VERSION="22.8.5.29"
ARG PACKAGES="clickhouse-client clickhouse-server clickhouse-common-static"
# user/group precreated explicitly with fixed uid/gid on purpose.

View File

@ -21,7 +21,7 @@ RUN sed -i "s|http://archive.ubuntu.com|${apt_archive}|g" /etc/apt/sources.list
ARG REPO_CHANNEL="stable"
ARG REPOSITORY="deb https://packages.clickhouse.com/deb ${REPO_CHANNEL} main"
ARG VERSION=22.6.1.*
ARG VERSION="22.8.5.29"
ARG PACKAGES="clickhouse-client clickhouse-server clickhouse-common-static"
# set non-empty deb_location_url url to create a docker image

View File

@ -61,7 +61,7 @@ function configure
cp -rv right/config left ||:
# Start a temporary server to rename the tables
while pkill clickhouse-serv; do echo . ; sleep 1 ; done
while pkill -f clickhouse-serv ; do echo . ; sleep 1 ; done
echo all killed
set -m # Spawn temporary in its own process groups
@ -88,7 +88,7 @@ function configure
clickhouse-client --port $LEFT_SERVER_PORT --query "create database test" ||:
clickhouse-client --port $LEFT_SERVER_PORT --query "rename table datasets.hits_v1 to test.hits" ||:
while pkill clickhouse-serv; do echo . ; sleep 1 ; done
while pkill -f clickhouse-serv ; do echo . ; sleep 1 ; done
echo all killed
# Make copies of the original db for both servers. Use hardlinks instead
@ -106,7 +106,7 @@ function configure
function restart
{
while pkill clickhouse-serv; do echo . ; sleep 1 ; done
while pkill -f clickhouse-serv ; do echo . ; sleep 1 ; done
echo all killed
# Change the jemalloc settings here.
@ -1400,7 +1400,7 @@ case "$stage" in
while env kill -- -$watchdog_pid ; do sleep 1; done
# Stop the servers to free memory for the subsequent query analysis.
while pkill clickhouse-serv; do echo . ; sleep 1 ; done
while pkill -f clickhouse-serv ; do echo . ; sleep 1 ; done
echo Servers stopped.
;&
"analyze_queries")

View File

@ -103,6 +103,7 @@ ClickHouse, Inc. does **not** maintain the tools and libraries listed below and
- [ClickHouse.Client](https://github.com/DarkWanderer/ClickHouse.Client)
- [ClickHouse.Net](https://github.com/ilyabreev/ClickHouse.Net)
- [ClickHouse.Net.Migrations](https://github.com/ilyabreev/ClickHouse.Net.Migrations)
- [Linq To DB](https://github.com/linq2db/linq2db)
- Elixir
- [Ecto](https://github.com/elixir-ecto/ecto)
- [clickhouse_ecto](https://github.com/appodeal/clickhouse_ecto)

View File

@ -166,7 +166,7 @@ Cache **configuration settings**:
- `enable_cache_hits_threshold` - a number, which defines how many times some data needs to be read before it will be cached. Default: `0`, e.g. the data is cached at the first attempt to read it.
- `do_not_evict_index_and_mark_files` - do not evict small frequently used files according to cache policy. Default: `true`.
- `do_not_evict_index_and_mark_files` - do not evict small frequently used files according to cache policy. Default: `false`. This setting was added in version 22.8. If you used filesystem cache before this version, then it will not work on versions starting from 22.8 if this setting is set to `true`. If you want to use this setting, clear old cache created before version 22.8 before upgrading.
- `max_file_segment_size` - a maximum size of a single cache file. Default: `104857600` (100 Mb).

View File

@ -2,10 +2,9 @@
slug: /en/operations/troubleshooting
sidebar_position: 46
sidebar_label: Troubleshooting
title: Troubleshooting
---
# Troubleshooting
- [Installation](#troubleshooting-installation-errors)
- [Connecting to the server](#troubleshooting-accepts-no-connections)
- [Query processing](#troubleshooting-does-not-process-queries)

View File

@ -1227,6 +1227,8 @@ Result:
Function converts Unix timestamp to a calendar date and a time of a day. When there is only a single argument of [Integer](../../sql-reference/data-types/int-uint.md) type, it acts in the same way as [toDateTime](../../sql-reference/functions/type-conversion-functions.md#todatetime) and return [DateTime](../../sql-reference/data-types/datetime.md) type.
Alias: `fromUnixTimestamp`.
**Example:**
Query:

View File

@ -32,6 +32,12 @@ SET allow_experimental_lightweight_delete = true;
An [alternative way to delete rows](./alter/delete.md) in ClickHouse is `ALTER TABLE ... DELETE`, which might be more efficient if you do bulk deletes only occasionally and don't need the operation to be applied instantly. In most use cases the new lightweight `DELETE FROM` behavior will be considerably faster.
:::warning
Even though deletes are becoming more lightweight in ClickHouse, they should still not be used as aggressively as on OLTP system. Ligthweight deletes are currently efficient for wide parts, but for compact parts they can be a heavyweight operation, and it may be better to use `ALTER TABLE` for some scenarios.
Even though deletes are becoming more lightweight in ClickHouse, they should still not be used as aggressively as on an OLTP system. Ligthweight deletes are currently efficient for wide parts, but for compact parts they can be a heavyweight operation, and it may be better to use `ALTER TABLE` for some scenarios.
:::
:::note
`DELETE FROM` requires the `ALTER DELETE` privilege:
```sql
grant ALTER DELETE ON db.table to username;
```
:::

View File

@ -6,45 +6,6 @@ sidebar_label: SYSTEM
# SYSTEM Statements
The list of available `SYSTEM` statements:
- [RELOAD EMBEDDED DICTIONARIES](#query_language-system-reload-emdedded-dictionaries)
- [RELOAD DICTIONARIES](#query_language-system-reload-dictionaries)
- [RELOAD DICTIONARY](#query_language-system-reload-dictionary)
- [RELOAD MODELS](#query_language-system-reload-models)
- [RELOAD MODEL](#query_language-system-reload-model)
- [RELOAD FUNCTIONS](#query_language-system-reload-functions)
- [RELOAD FUNCTION](#query_language-system-reload-functions)
- [DROP DNS CACHE](#query_language-system-drop-dns-cache)
- [DROP MARK CACHE](#query_language-system-drop-mark-cache)
- [DROP UNCOMPRESSED CACHE](#query_language-system-drop-uncompressed-cache)
- [DROP COMPILED EXPRESSION CACHE](#query_language-system-drop-compiled-expression-cache)
- [DROP REPLICA](#query_language-system-drop-replica)
- [FLUSH LOGS](#query_language-system-flush_logs)
- [RELOAD CONFIG](#query_language-system-reload-config)
- [SHUTDOWN](#query_language-system-shutdown)
- [KILL](#query_language-system-kill)
- [STOP DISTRIBUTED SENDS](#query_language-system-stop-distributed-sends)
- [FLUSH DISTRIBUTED](#query_language-system-flush-distributed)
- [START DISTRIBUTED SENDS](#query_language-system-start-distributed-sends)
- [STOP MERGES](#query_language-system-stop-merges)
- [START MERGES](#query_language-system-start-merges)
- [STOP TTL MERGES](#query_language-stop-ttl-merges)
- [START TTL MERGES](#query_language-start-ttl-merges)
- [STOP MOVES](#query_language-stop-moves)
- [START MOVES](#query_language-start-moves)
- [SYSTEM UNFREEZE](#query_language-system-unfreeze)
- [STOP FETCHES](#query_language-system-stop-fetches)
- [START FETCHES](#query_language-system-start-fetches)
- [STOP REPLICATED SENDS](#query_language-system-start-replicated-sends)
- [START REPLICATED SENDS](#query_language-system-start-replicated-sends)
- [STOP REPLICATION QUEUES](#query_language-system-stop-replication-queues)
- [START REPLICATION QUEUES](#query_language-system-start-replication-queues)
- [SYNC REPLICA](#query_language-system-sync-replica)
- [RESTART REPLICA](#query_language-system-restart-replica)
- [RESTORE REPLICA](#query_language-system-restore-replica)
- [RESTART REPLICAS](#query_language-system-restart-replicas)
## RELOAD EMBEDDED DICTIONARIES
Reload all [Internal dictionaries](../../sql-reference/dictionaries/internal-dicts.md).

View File

@ -6,43 +6,6 @@ sidebar_label: SYSTEM
# Запросы SYSTEM {#query-language-system}
- [RELOAD EMBEDDED DICTIONARIES](#query_language-system-reload-emdedded-dictionaries)
- [RELOAD DICTIONARIES](#query_language-system-reload-dictionaries)
- [RELOAD DICTIONARY](#query_language-system-reload-dictionary)
- [RELOAD MODELS](#query_language-system-reload-models)
- [RELOAD MODEL](#query_language-system-reload-model)
- [RELOAD FUNCTIONS](#query_language-system-reload-functions)
- [RELOAD FUNCTION](#query_language-system-reload-functions)
- [DROP DNS CACHE](#query_language-system-drop-dns-cache)
- [DROP MARK CACHE](#query_language-system-drop-mark-cache)
- [DROP UNCOMPRESSED CACHE](#query_language-system-drop-uncompressed-cache)
- [DROP COMPILED EXPRESSION CACHE](#query_language-system-drop-compiled-expression-cache)
- [DROP REPLICA](#query_language-system-drop-replica)
- [FLUSH LOGS](#query_language-system-flush_logs)
- [RELOAD CONFIG](#query_language-system-reload-config)
- [SHUTDOWN](#query_language-system-shutdown)
- [KILL](#query_language-system-kill)
- [STOP DISTRIBUTED SENDS](#query_language-system-stop-distributed-sends)
- [FLUSH DISTRIBUTED](#query_language-system-flush-distributed)
- [START DISTRIBUTED SENDS](#query_language-system-start-distributed-sends)
- [STOP MERGES](#query_language-system-stop-merges)
- [START MERGES](#query_language-system-start-merges)
- [STOP TTL MERGES](#query_language-stop-ttl-merges)
- [START TTL MERGES](#query_language-start-ttl-merges)
- [STOP MOVES](#query_language-stop-moves)
- [START MOVES](#query_language-start-moves)
- [SYSTEM UNFREEZE](#query_language-system-unfreeze)
- [STOP FETCHES](#query_language-system-stop-fetches)
- [START FETCHES](#query_language-system-start-fetches)
- [STOP REPLICATED SENDS](#query_language-system-start-replicated-sends)
- [START REPLICATED SENDS](#query_language-system-start-replicated-sends)
- [STOP REPLICATION QUEUES](#query_language-system-stop-replication-queues)
- [START REPLICATION QUEUES](#query_language-system-start-replication-queues)
- [SYNC REPLICA](#query_language-system-sync-replica)
- [RESTART REPLICA](#query_language-system-restart-replica)
- [RESTORE REPLICA](#query_language-system-restore-replica)
- [RESTART REPLICAS](#query_language-system-restart-replicas)
## RELOAD EMBEDDED DICTIONARIES] {#query_language-system-reload-emdedded-dictionaries}
Перегружает все [Встроенные словари](../dictionaries/internal-dicts.md).
По умолчанию встроенные словари выключены.

View File

@ -6,38 +6,6 @@ sidebar_label: SYSTEM
# SYSTEM Queries {#query-language-system}
- [RELOAD EMBEDDED DICTIONARIES](#query_language-system-reload-emdedded-dictionaries)
- [RELOAD DICTIONARIES](#query_language-system-reload-dictionaries)
- [RELOAD DICTIONARY](#query_language-system-reload-dictionary)
- [DROP DNS CACHE](#query_language-system-drop-dns-cache)
- [DROP MARK CACHE](#query_language-system-drop-mark-cache)
- [DROP UNCOMPRESSED CACHE](#query_language-system-drop-uncompressed-cache)
- [DROP COMPILED EXPRESSION CACHE](#query_language-system-drop-compiled-expression-cache)
- [DROP REPLICA](#query_language-system-drop-replica)
- [FLUSH LOGS](#query_language-system-flush_logs)
- [RELOAD CONFIG](#query_language-system-reload-config)
- [SHUTDOWN](#query_language-system-shutdown)
- [KILL](#query_language-system-kill)
- [STOP DISTRIBUTED SENDS](#query_language-system-stop-distributed-sends)
- [FLUSH DISTRIBUTED](#query_language-system-flush-distributed)
- [START DISTRIBUTED SENDS](#query_language-system-start-distributed-sends)
- [STOP MERGES](#query_language-system-stop-merges)
- [START MERGES](#query_language-system-start-merges)
- [STOP TTL MERGES](#query_language-stop-ttl-merges)
- [START TTL MERGES](#query_language-start-ttl-merges)
- [STOP MOVES](#query_language-stop-moves)
- [START MOVES](#query_language-start-moves)
- [SYSTEM UNFREEZE](#query_language-system-unfreeze)
- [STOP FETCHES](#query_language-system-stop-fetches)
- [START FETCHES](#query_language-system-start-fetches)
- [STOP REPLICATED SENDS](#query_language-system-start-replicated-sends)
- [START REPLICATED SENDS](#query_language-system-start-replicated-sends)
- [STOP REPLICATION QUEUES](#query_language-system-stop-replication-queues)
- [START REPLICATION QUEUES](#query_language-system-start-replication-queues)
- [SYNC REPLICA](#query_language-system-sync-replica)
- [RESTART REPLICA](#query_language-system-restart-replica)
- [RESTART REPLICAS](#query_language-system-restart-replicas)
## RELOAD EMBEDDED DICTIONARIES\] {#query_language-system-reload-emdedded-dictionaries}
重新加载所有[内置字典](../../sql-reference/dictionaries/internal-dicts.md)。默认情况下内置字典是禁用的。

View File

@ -47,9 +47,10 @@ CLICKHOUSE_PIDFILE="$CLICKHOUSE_PIDDIR/$PROGRAM.pid"
# Some systems lack "flock"
command -v flock >/dev/null && FLOCK=flock
# Override defaults from optional config file
# Override defaults from optional config file and export them automatically
set -a
test -f /etc/default/clickhouse && . /etc/default/clickhouse
set +a
die()
{

View File

@ -1080,6 +1080,20 @@ bool ClientBase::receiveSampleBlock(Block & out, ColumnsDescription & columns_de
}
void ClientBase::setInsertionTable(const ASTInsertQuery & insert_query)
{
if (!global_context->hasInsertionTable() && insert_query.table)
{
String table = insert_query.table->as<ASTIdentifier &>().shortName();
if (!table.empty())
{
String database = insert_query.database ? insert_query.database->as<ASTIdentifier &>().shortName() : "";
global_context->setInsertionTable(StorageID(database, table));
}
}
}
void ClientBase::processInsertQuery(const String & query_to_execute, ASTPtr parsed_query)
{
auto query = query_to_execute;
@ -1129,6 +1143,8 @@ void ClientBase::processInsertQuery(const String & query_to_execute, ASTPtr pars
{
/// If structure was received (thus, server has not thrown an exception),
/// send our data with that structure.
setInsertionTable(parsed_insert_query);
sendData(sample, columns_description, parsed_query);
receiveEndOfQuery();
}

View File

@ -113,6 +113,8 @@ protected:
std::vector<Arguments> & external_tables_arguments,
std::vector<Arguments> & hosts_and_ports_arguments) = 0;
void setInsertionTable(const ASTInsertQuery & insert_query);
private:
void receiveResult(ASTPtr parsed_query);

View File

@ -130,16 +130,15 @@ void SpanHolder::finish() noexcept
try
{
auto log = current_thread_trace_context.span_log.lock();
if (!log)
/// The log might be disabled, check it before use
if (log)
{
// The log might be disabled.
return;
this->finish_time_us
= std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
log->add(OpenTelemetrySpanLogElement(*this));
}
this->finish_time_us
= std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
log->add(OpenTelemetrySpanLogElement(*this));
}
catch (...)
{

View File

@ -264,6 +264,18 @@ protected:
}
};
/// Schedule jobs/tasks on global thread pool without implicit passing tracing context on current thread to underlying worker as parent tracing context.
///
/// If you implement your own job/task scheduling upon global thread pool or schedules a long time running job in a infinite loop way,
/// you need to use class, or you need to use ThreadFromGlobalPool below.
///
/// See the comments of ThreadPool below to know how it works.
using ThreadFromGlobalPoolNoTracingContextPropagation = ThreadFromGlobalPoolImpl<false>;
/// An alias of thread that execute jobs/tasks on global thread pool by implicit passing tracing context on current thread to underlying worker as parent tracing context.
/// If jobs/tasks are directly scheduled by using APIs of this class, you need to use this class or you need to use class above.
using ThreadFromGlobalPool = ThreadFromGlobalPoolImpl<true>;
/// Recommended thread pool for the case when multiple thread pools are created and destroyed.
///
/// The template parameter of ThreadFromGlobalPool is set to false to disable tracing context propagation to underlying worker.
@ -274,9 +286,6 @@ protected:
/// which means the tracing context initialized at underlying worker level won't be delete for a very long time.
/// This would cause wrong context for further jobs scheduled in ThreadPool.
///
/// To make sure the tracing context are correctly propagated, we explicitly disable context propagation(including initialization and de-initialization) at underlying worker level.
/// To make sure the tracing context is correctly propagated, we explicitly disable context propagation(including initialization and de-initialization) at underlying worker level.
///
using ThreadPool = ThreadPoolImpl<ThreadFromGlobalPoolImpl<false>>;
/// An alias for user code to execute a job in the global thread pool
using ThreadFromGlobalPool = ThreadFromGlobalPoolImpl<true>;
using ThreadPool = ThreadPoolImpl<ThreadFromGlobalPoolNoTracingContextPropagation>;

View File

@ -30,6 +30,7 @@ struct Settings;
M(UInt64, snapshot_distance, 100000, "How many log items we have to collect to write new snapshot", 0) \
M(Bool, auto_forwarding, true, "Allow to forward write requests from followers to leader", 0) \
M(Milliseconds, shutdown_timeout, 5000, "How much time we will wait until RAFT shutdown", 0) \
M(Milliseconds, session_shutdown_timeout, 10000, "How much time we will wait until sessions are closed during shutdown", 0) \
M(Milliseconds, startup_timeout, 180000, "How much time we will wait until RAFT to start.", 0) \
M(LogsLevel, raft_logs_level, LogsLevel::information, "Log internal RAFT logs into main server log level. Valid values: 'trace', 'debug', 'information', 'warning', 'error', 'fatal', 'none'", 0) \
M(UInt64, rotate_log_storage_interval, 100000, "How many records will be stored in one log storage file", 0) \

View File

@ -354,9 +354,6 @@ void KeeperDispatcher::shutdown()
update_configuration_thread.join();
}
if (server)
server->shutdown();
KeeperStorage::RequestForSession request_for_session;
/// Set session expired for all pending requests
@ -368,10 +365,58 @@ void KeeperDispatcher::shutdown()
setResponse(request_for_session.session_id, response);
}
/// Clear all registered sessions
std::lock_guard lock(session_to_response_callback_mutex);
session_to_response_callback.clear();
KeeperStorage::RequestsForSessions close_requests;
{
/// Clear all registered sessions
std::lock_guard lock(session_to_response_callback_mutex);
if (hasLeader())
{
close_requests.reserve(session_to_response_callback.size());
// send to leader CLOSE requests for active sessions
for (const auto & [session, response] : session_to_response_callback)
{
auto request = Coordination::ZooKeeperRequestFactory::instance().get(Coordination::OpNum::Close);
request->xid = Coordination::CLOSE_XID;
using namespace std::chrono;
KeeperStorage::RequestForSession request_info
{
.session_id = session,
.time = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count(),
.request = std::move(request),
};
close_requests.push_back(std::move(request_info));
}
}
session_to_response_callback.clear();
}
// if there is no leader, there is no reason to do CLOSE because it's a write request
if (hasLeader() && !close_requests.empty())
{
LOG_INFO(log, "Trying to close {} session(s)", close_requests.size());
const auto raft_result = server->putRequestBatch(close_requests);
auto sessions_closing_done_promise = std::make_shared<std::promise<void>>();
auto sessions_closing_done = sessions_closing_done_promise->get_future();
raft_result->when_ready([sessions_closing_done_promise = std::move(sessions_closing_done_promise)](
nuraft::cmd_result<nuraft::ptr<nuraft::buffer>> & /*result*/,
nuraft::ptr<std::exception> & /*exception*/) { sessions_closing_done_promise->set_value(); });
auto session_shutdown_timeout = configuration_and_settings->coordination_settings->session_shutdown_timeout.totalMilliseconds();
if (sessions_closing_done.wait_for(std::chrono::milliseconds(session_shutdown_timeout)) != std::future_status::ready)
LOG_WARNING(
log,
"Failed to close sessions in {}ms. If they are not closed, they will be closed after session timeout.",
session_shutdown_timeout);
}
if (server)
server->shutdown();
CurrentMetrics::set(CurrentMetrics::KeeperAliveConnections, 0);
}
catch (...)
{
@ -418,13 +463,15 @@ void KeeperDispatcher::sessionCleanerTask()
LOG_INFO(log, "Found dead session {}, will try to close it", dead_session);
/// Close session == send close request to raft server
Coordination::ZooKeeperRequestPtr request = Coordination::ZooKeeperRequestFactory::instance().get(Coordination::OpNum::Close);
auto request = Coordination::ZooKeeperRequestFactory::instance().get(Coordination::OpNum::Close);
request->xid = Coordination::CLOSE_XID;
KeeperStorage::RequestForSession request_info;
request_info.request = request;
using namespace std::chrono;
request_info.time = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
request_info.session_id = dead_session;
KeeperStorage::RequestForSession request_info
{
.session_id = dead_session,
.time = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count(),
.request = std::move(request),
};
{
std::lock_guard lock(push_request_mutex);
if (!requests_queue->push(std::move(request_info)))

View File

@ -149,9 +149,9 @@ BackgroundSchedulePool::BackgroundSchedulePool(size_t size_, CurrentMetrics::Met
threads.resize(size_);
for (auto & thread : threads)
thread = ThreadFromGlobalPool([this] { threadFunction(); });
thread = ThreadFromGlobalPoolNoTracingContextPropagation([this] { threadFunction(); });
delayed_thread = ThreadFromGlobalPool([this] { delayExecutionThreadFunction(); });
delayed_thread = ThreadFromGlobalPoolNoTracingContextPropagation([this] { delayExecutionThreadFunction(); });
}
@ -168,7 +168,7 @@ void BackgroundSchedulePool::increaseThreadsCount(size_t new_threads_count)
threads.resize(new_threads_count);
for (size_t i = old_threads_count; i < new_threads_count; ++i)
threads[i] = ThreadFromGlobalPool([this] { threadFunction(); });
threads[i] = ThreadFromGlobalPoolNoTracingContextPropagation([this] { threadFunction(); });
}

View File

@ -57,7 +57,9 @@ public:
~BackgroundSchedulePool();
private:
using Threads = std::vector<ThreadFromGlobalPool>;
/// BackgroundSchedulePool schedules a task on its own task queue, there's no need to construct/restore tracing context on this level.
/// This is also how ThreadPool class treats the tracing context. See ThreadPool for more information.
using Threads = std::vector<ThreadFromGlobalPoolNoTracingContextPropagation>;
void threadFunction();
void delayExecutionThreadFunction();
@ -83,7 +85,7 @@ private:
std::condition_variable delayed_tasks_cond_var;
std::mutex delayed_tasks_mutex;
/// Thread waiting for next delayed task.
ThreadFromGlobalPool delayed_thread;
ThreadFromGlobalPoolNoTracingContextPropagation delayed_thread;
/// Tasks ordered by scheduled time.
DelayedTasks delayed_tasks;

View File

@ -777,6 +777,8 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
\
M(UInt64, input_format_allow_errors_num, 0, "Maximum absolute amount of errors while reading text formats (like CSV, TSV). In case of error, if at least absolute or relative amount of errors is lower than corresponding value, will skip until next line and continue.", 0) \
M(Float, input_format_allow_errors_ratio, 0, "Maximum relative amount of errors while reading text formats (like CSV, TSV). In case of error, if at least absolute or relative amount of errors is lower than corresponding value, will skip until next line and continue.", 0) \
M(String, input_format_record_errors_file_path, "", "Path of the file used to record errors while reading text formats (CSV, TSV).", 0) \
M(String, errors_output_format, "CSV", "Method to write Errors to text output.", 0) \
\
M(String, format_schema, "", "Schema identifier (used by schema-based formats)", 0) \
M(String, format_template_resultset, "", "Path to file which contains format string for result set (for Template format)", 0) \

View File

@ -1,3 +1,4 @@
#include <Storages/StorageSnapshot.h>
#include <DataTypes/ObjectUtils.h>
#include <DataTypes/DataTypeObject.h>
#include <DataTypes/DataTypeNothing.h>
@ -159,6 +160,16 @@ void convertObjectsToTuples(Block & block, const NamesAndTypesList & extended_st
}
}
void deduceTypesOfObjectColumns(const StorageSnapshotPtr & storage_snapshot, Block & block)
{
if (!storage_snapshot->object_columns.empty())
{
auto options = GetColumnsOptions(GetColumnsOptions::AllPhysical).withExtendedObjects();
auto storage_columns = storage_snapshot->getColumns(options);
convertObjectsToTuples(block, storage_columns);
}
}
static bool isPrefix(const PathInData::Parts & prefix, const PathInData::Parts & parts)
{
if (prefix.size() > parts.size())

View File

@ -11,6 +11,9 @@
namespace DB
{
struct StorageSnapshot;
using StorageSnapshotPtr = std::shared_ptr<StorageSnapshot>;
/// Returns number of dimensions in Array type. 0 if type is not array.
size_t getNumberOfDimensions(const IDataType & type);
@ -38,6 +41,7 @@ DataTypePtr getDataTypeByColumn(const IColumn & column);
/// Converts Object types and columns to Tuples in @columns_list and @block
/// and checks that types are consistent with types in @extended_storage_columns.
void convertObjectsToTuples(Block & block, const NamesAndTypesList & extended_storage_columns);
void deduceTypesOfObjectColumns(const StorageSnapshotPtr & storage_snapshot, Block & block);
/// Checks that each path is not the prefix of any other path.
void checkObjectHasNoAmbiguosPaths(const PathsInData & paths);
@ -164,27 +168,24 @@ ColumnsDescription getObjectColumns(
const ColumnsDescription & storage_columns,
EntryColumnsGetter && entry_columns_getter)
{
ColumnsDescription res;
if (begin == end)
{
for (const auto & column : storage_columns)
{
if (isObject(column.type))
{
auto tuple_type = std::make_shared<DataTypeTuple>(
DataTypes{std::make_shared<DataTypeUInt8>()},
Names{ColumnObject::COLUMN_NAME_DUMMY});
res.add({column.name, std::move(tuple_type)});
}
}
return res;
}
std::unordered_map<String, DataTypes> types_in_entries;
/// Add dummy column for all Object columns
/// to not lose any column if it's missing
/// in all entries. If it exists in any entry
/// dummy column will be removed.
for (const auto & column : storage_columns)
{
if (isObject(column.type))
{
auto tuple_type = std::make_shared<DataTypeTuple>(
DataTypes{std::make_shared<DataTypeUInt8>()},
Names{ColumnObject::COLUMN_NAME_DUMMY});
types_in_entries[column.name].push_back(std::move(tuple_type));
}
}
for (auto it = begin; it != end; ++it)
{
const auto & entry_columns = entry_columns_getter(*it);
@ -196,6 +197,7 @@ ColumnsDescription getObjectColumns(
}
}
ColumnsDescription res;
for (const auto & [name, types] : types_in_entries)
res.add({name, getLeastCommonTypeForObject(types)});

View File

@ -143,9 +143,11 @@ void CachedOnDiskReadBufferFromFile::initialize(size_t offset, size_t size)
}
CachedOnDiskReadBufferFromFile::ImplementationBufferPtr
CachedOnDiskReadBufferFromFile::getCacheReadBuffer(size_t offset) const
CachedOnDiskReadBufferFromFile::getCacheReadBuffer(const FileSegment & file_segment) const
{
auto path = cache->getPathInLocalCache(cache_key, offset, is_persistent);
/// Use is_persistent flag from in-memory state of the filesegment,
/// because it is consistent with what is written on disk.
auto path = file_segment.getPathInLocalCache();
ReadSettings local_read_settings{settings};
/// Do not allow to use asynchronous version of LocalFSReadMethod.
@ -206,7 +208,7 @@ CachedOnDiskReadBufferFromFile::getRemoteFSReadBuffer(FileSegment & file_segment
return remote_file_reader;
auto remote_fs_segment_reader = file_segment.extractRemoteFileReader();
if (remote_fs_segment_reader && file_offset_of_buffer_end == remote_file_reader->getFileOffsetOfBufferEnd())
if (remote_fs_segment_reader && file_offset_of_buffer_end == remote_fs_segment_reader->getFileOffsetOfBufferEnd())
remote_file_reader = remote_fs_segment_reader;
else
remote_file_reader = implementation_buffer_creator();
@ -237,8 +239,6 @@ bool CachedOnDiskReadBufferFromFile::canStartFromCache(size_t current_offset, co
CachedOnDiskReadBufferFromFile::ImplementationBufferPtr
CachedOnDiskReadBufferFromFile::getReadBufferForFileSegment(FileSegmentPtr & file_segment)
{
auto range = file_segment->range();
auto download_state = file_segment->state();
LOG_TEST(log, "getReadBufferForFileSegment: {}", file_segment->getInfoForLog());
@ -247,7 +247,7 @@ CachedOnDiskReadBufferFromFile::getReadBufferForFileSegment(FileSegmentPtr & fil
if (download_state == FileSegment::State::DOWNLOADED)
{
read_type = ReadType::CACHED;
return getCacheReadBuffer(range.left);
return getCacheReadBuffer(*file_segment);
}
else
{
@ -280,7 +280,7 @@ CachedOnDiskReadBufferFromFile::getReadBufferForFileSegment(FileSegmentPtr & fil
/// file_offset_of_buffer_end
read_type = ReadType::CACHED;
return getCacheReadBuffer(range.left);
return getCacheReadBuffer(*file_segment);
}
download_state = file_segment->wait();
@ -289,7 +289,7 @@ CachedOnDiskReadBufferFromFile::getReadBufferForFileSegment(FileSegmentPtr & fil
case FileSegment::State::DOWNLOADED:
{
read_type = ReadType::CACHED;
return getCacheReadBuffer(range.left);
return getCacheReadBuffer(*file_segment);
}
case FileSegment::State::EMPTY:
case FileSegment::State::PARTIALLY_DOWNLOADED:
@ -305,7 +305,7 @@ CachedOnDiskReadBufferFromFile::getReadBufferForFileSegment(FileSegmentPtr & fil
/// file_offset_of_buffer_end
read_type = ReadType::CACHED;
return getCacheReadBuffer(range.left);
return getCacheReadBuffer(*file_segment);
}
auto downloader_id = file_segment->getOrSetDownloader();
@ -323,7 +323,7 @@ CachedOnDiskReadBufferFromFile::getReadBufferForFileSegment(FileSegmentPtr & fil
read_type = ReadType::CACHED;
file_segment->resetDownloader();
return getCacheReadBuffer(range.left);
return getCacheReadBuffer(*file_segment);
}
if (file_segment->getCurrentWriteOffset() < file_offset_of_buffer_end)
@ -339,7 +339,7 @@ CachedOnDiskReadBufferFromFile::getReadBufferForFileSegment(FileSegmentPtr & fil
LOG_TEST(log, "Predownload. File segment info: {}", file_segment->getInfoForLog());
chassert(file_offset_of_buffer_end > file_segment->getCurrentWriteOffset());
bytes_to_predownload = file_offset_of_buffer_end - file_segment->getCurrentWriteOffset();
chassert(bytes_to_predownload < range.size());
chassert(bytes_to_predownload < file_segment->range().size());
}
read_type = ReadType::REMOTE_FS_READ_AND_PUT_IN_CACHE;
@ -354,7 +354,7 @@ CachedOnDiskReadBufferFromFile::getReadBufferForFileSegment(FileSegmentPtr & fil
if (canStartFromCache(file_offset_of_buffer_end, *file_segment))
{
read_type = ReadType::CACHED;
return getCacheReadBuffer(range.left);
return getCacheReadBuffer(*file_segment);
}
else
{

View File

@ -68,7 +68,7 @@ private:
ImplementationBufferPtr getReadBufferForFileSegment(FileSegmentPtr & file_segment);
ImplementationBufferPtr getCacheReadBuffer(size_t offset) const;
ImplementationBufferPtr getCacheReadBuffer(const FileSegment & file_segment) const;
std::optional<size_t> getLastNonDownloadedOffset() const;

View File

@ -13,7 +13,6 @@ namespace DB
namespace ErrorCodes
{
extern const int UNKNOWN_FORMAT;
extern const int LOGICAL_ERROR;
}
void DiskObjectStorageMetadata::deserialize(ReadBuffer & buf)
@ -131,9 +130,6 @@ DiskObjectStorageMetadata::DiskObjectStorageMetadata(
void DiskObjectStorageMetadata::addObject(const String & path, size_t size)
{
if (!object_storage_root_path.empty() && path.starts_with(object_storage_root_path))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected relative path");
total_size += size;
storage_objects.emplace_back(path, size);
}

View File

@ -31,7 +31,6 @@
#include <Common/logger_useful.h>
#include <Common/MultiVersion.h>
namespace DB
{
@ -91,19 +90,7 @@ void logIfError(const Aws::Utils::Outcome<Result, Error> & response, std::functi
std::string S3ObjectStorage::generateBlobNameForPath(const std::string & /* path */)
{
/// Path to store the new S3 object.
/// Total length is 32 a-z characters for enough randomness.
/// First 3 characters are used as a prefix for
/// https://aws.amazon.com/premiumsupport/knowledge-center/s3-object-key-naming-pattern/
constexpr size_t key_name_total_size = 32;
constexpr size_t key_name_prefix_size = 3;
/// Path to store new S3 object.
return fmt::format("{}/{}",
getRandomASCIIString(key_name_prefix_size),
getRandomASCIIString(key_name_total_size - key_name_prefix_size));
return getRandomASCIIString(32);
}
Aws::S3::Model::HeadObjectOutcome S3ObjectStorage::requestObjectHeadData(const std::string & bucket_from, const std::string & key) const

View File

@ -3,7 +3,6 @@
#include <string>
#include <Disks/ObjectStorages/IObjectStorage_fwd.h>
namespace DB
{

View File

@ -243,11 +243,20 @@ InputFormatPtr FormatFactory::getInput(
ParallelParsingInputFormat::Params params{
buf, sample, parser_creator, file_segmentation_engine, name, settings.max_threads, settings.min_chunk_bytes_for_parallel_parsing,
context->getApplicationType() == Context::ApplicationType::SERVER};
return std::make_shared<ParallelParsingInputFormat>(params);
auto format = std::make_shared<ParallelParsingInputFormat>(params);
if (!settings.input_format_record_errors_file_path.toString().empty())
{
format->setErrorsLogger(std::make_shared<ParallelInputFormatErrorsLogger>(context));
}
return format;
}
auto format = getInputFormat(name, buf, sample, context, max_block_size, format_settings);
if (!settings.input_format_record_errors_file_path.toString().empty())
{
format->setErrorsLogger(std::make_shared<InputFormatErrorsLogger>(context));
}
return format;
}

View File

@ -612,6 +612,7 @@ public:
void killCurrentQuery();
bool hasInsertionTable() const { return !insertion_table.empty(); }
void setInsertionTable(StorageID db_and_table) { insertion_table = std::move(db_and_table); }
const StorageID & getInsertionTable() const { return insertion_table; }

View File

@ -28,6 +28,11 @@ namespace CurrentMetrics
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
namespace
{
@ -84,10 +89,13 @@ void SortedBlocksWriter::insert(Block && block)
size_t bytes = 0;
size_t flush_no = 0;
if (!block.rows())
return;
{
std::lock_guard lock{insert_mutex};
/// insert bock into BlocksList undef lock
/// insert block into BlocksList under lock
inserted_blocks.insert(std::move(block));
size_t total_row_count = inserted_blocks.row_count + row_count_in_flush;
@ -145,7 +153,7 @@ SortedBlocksWriter::TmpFilePtr SortedBlocksWriter::flush(const BlocksList & bloc
pipes.emplace_back(std::make_shared<SourceFromSingleChunk>(block.cloneEmpty(), Chunk(block.getColumns(), num_rows)));
if (pipes.empty())
return {};
throw Exception(ErrorCodes::LOGICAL_ERROR, "Empty block");
QueryPipelineBuilder pipeline;
pipeline.init(Pipe::unitePipes(std::move(pipes)));

View File

@ -69,6 +69,8 @@ struct StorageID
return uuid != UUIDHelpers::Nil;
}
bool hasDatabase() const { return !database_name.empty(); }
bool operator<(const StorageID & rhs) const;
bool operator==(const StorageID & rhs) const;

View File

@ -838,101 +838,117 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
{
QueryStatus * process_list_elem = context->getProcessListElement();
if (!process_list_elem)
return;
/// Update performance counters before logging to query_log
CurrentThread::finalizePerformanceCounters();
QueryStatusInfo info = process_list_elem->getInfo(true, context->getSettingsRef().log_profile_events);
double elapsed_seconds = info.elapsed_seconds;
elem.type = QueryLogElementType::QUERY_FINISH;
// construct event_time and event_time_microseconds using the same time point
// so that the two times will always be equal up to a precision of a second.
const auto finish_time = std::chrono::system_clock::now();
elem.event_time = time_in_seconds(finish_time);
elem.event_time_microseconds = time_in_microseconds(finish_time);
status_info_to_query_log(elem, info, ast, context);
if (pulling_pipeline)
if (process_list_elem)
{
query_pipeline.tryGetResultRowsAndBytes(elem.result_rows, elem.result_bytes);
}
else /// will be used only for ordinary INSERT queries
{
auto progress_out = process_list_elem->getProgressOut();
elem.result_rows = progress_out.written_rows;
elem.result_bytes = progress_out.written_bytes;
}
/// Update performance counters before logging to query_log
CurrentThread::finalizePerformanceCounters();
auto progress_callback = context->getProgressCallback();
if (progress_callback)
{
Progress p(WriteProgress{info.written_rows, info.written_bytes});
p.incrementPiecewiseAtomically(Progress{ResultProgress{elem.result_rows, elem.result_bytes}});
progress_callback(p);
}
QueryStatusInfo info = process_list_elem->getInfo(true, context->getSettingsRef().log_profile_events);
if (elem.read_rows != 0)
{
LOG_INFO(&Poco::Logger::get("executeQuery"), "Read {} rows, {} in {} sec., {} rows/sec., {}/sec.",
elem.read_rows, ReadableSize(elem.read_bytes), elapsed_seconds,
static_cast<size_t>(elem.read_rows / elapsed_seconds),
ReadableSize(elem.read_bytes / elapsed_seconds));
}
double elapsed_seconds = info.elapsed_seconds;
if (log_queries && elem.type >= log_queries_min_type && static_cast<Int64>(elem.query_duration_ms) >= log_queries_min_query_duration_ms)
{
if (auto query_log = context->getQueryLog())
query_log->add(elem);
}
if (log_processors_profiles)
{
if (auto processors_profile_log = context->getProcessorsProfileLog())
elem.type = QueryLogElementType::QUERY_FINISH;
// construct event_time and event_time_microseconds using the same time point
// so that the two times will always be equal up to a precision of a second.
const auto finish_time = std::chrono::system_clock::now();
elem.event_time = time_in_seconds(finish_time);
elem.event_time_microseconds = time_in_microseconds(finish_time);
status_info_to_query_log(elem, info, ast, context);
if (pulling_pipeline)
{
ProcessorProfileLogElement processor_elem;
processor_elem.event_time = time_in_seconds(finish_time);
processor_elem.event_time_microseconds = time_in_microseconds(finish_time);
processor_elem.query_id = elem.client_info.current_query_id;
query_pipeline.tryGetResultRowsAndBytes(elem.result_rows, elem.result_bytes);
}
else /// will be used only for ordinary INSERT queries
{
auto progress_out = process_list_elem->getProgressOut();
elem.result_rows = progress_out.written_rows;
elem.result_bytes = progress_out.written_bytes;
}
auto get_proc_id = [](const IProcessor & proc) -> UInt64
{
return reinterpret_cast<std::uintptr_t>(&proc);
};
auto progress_callback = context->getProgressCallback();
if (progress_callback)
{
Progress p(WriteProgress{info.written_rows, info.written_bytes});
p.incrementPiecewiseAtomically(Progress{ResultProgress{elem.result_rows, elem.result_bytes}});
progress_callback(p);
}
for (const auto & processor : query_pipeline.getProcessors())
if (elem.read_rows != 0)
{
LOG_INFO(&Poco::Logger::get("executeQuery"), "Read {} rows, {} in {} sec., {} rows/sec., {}/sec.",
elem.read_rows, ReadableSize(elem.read_bytes), elapsed_seconds,
static_cast<size_t>(elem.read_rows / elapsed_seconds),
ReadableSize(elem.read_bytes / elapsed_seconds));
}
if (log_queries && elem.type >= log_queries_min_type && static_cast<Int64>(elem.query_duration_ms) >= log_queries_min_query_duration_ms)
{
if (auto query_log = context->getQueryLog())
query_log->add(elem);
}
if (log_processors_profiles)
{
if (auto processors_profile_log = context->getProcessorsProfileLog())
{
std::vector<UInt64> parents;
for (const auto & port : processor->getOutputs())
ProcessorProfileLogElement processor_elem;
processor_elem.event_time = time_in_seconds(finish_time);
processor_elem.event_time_microseconds = time_in_microseconds(finish_time);
processor_elem.query_id = elem.client_info.current_query_id;
auto get_proc_id = [](const IProcessor & proc) -> UInt64
{
if (!port.isConnected())
continue;
const IProcessor & next = port.getInputPort().getProcessor();
parents.push_back(get_proc_id(next));
return reinterpret_cast<std::uintptr_t>(&proc);
};
for (const auto & processor : query_pipeline.getProcessors())
{
std::vector<UInt64> parents;
for (const auto & port : processor->getOutputs())
{
if (!port.isConnected())
continue;
const IProcessor & next = port.getInputPort().getProcessor();
parents.push_back(get_proc_id(next));
}
processor_elem.id = get_proc_id(*processor);
processor_elem.parent_ids = std::move(parents);
processor_elem.plan_step = reinterpret_cast<std::uintptr_t>(processor->getQueryPlanStep());
processor_elem.plan_group = processor->getQueryPlanStepGroup();
processor_elem.processor_name = processor->getName();
processor_elem.elapsed_us = processor->getElapsedUs();
processor_elem.input_wait_elapsed_us = processor->getInputWaitElapsedUs();
processor_elem.output_wait_elapsed_us = processor->getOutputWaitElapsedUs();
auto stats = processor->getProcessorDataStats();
processor_elem.input_rows = stats.input_rows;
processor_elem.input_bytes = stats.input_bytes;
processor_elem.output_rows = stats.output_rows;
processor_elem.output_bytes = stats.output_bytes;
processors_profile_log->add(processor_elem);
}
}
}
processor_elem.id = get_proc_id(*processor);
processor_elem.parent_ids = std::move(parents);
processor_elem.plan_step = reinterpret_cast<std::uintptr_t>(processor->getQueryPlanStep());
processor_elem.plan_group = processor->getQueryPlanStepGroup();
processor_elem.processor_name = processor->getName();
processor_elem.elapsed_us = processor->getElapsedUs();
processor_elem.input_wait_elapsed_us = processor->getInputWaitElapsedUs();
processor_elem.output_wait_elapsed_us = processor->getOutputWaitElapsedUs();
auto stats = processor->getProcessorDataStats();
processor_elem.input_rows = stats.input_rows;
processor_elem.input_bytes = stats.input_bytes;
processor_elem.output_rows = stats.output_rows;
processor_elem.output_bytes = stats.output_bytes;
processors_profile_log->add(processor_elem);
if (implicit_txn_control)
{
try
{
implicit_txn_control->executeCommit(context->getSessionContext());
implicit_txn_control.reset();
}
catch (const Exception &)
{
/// An exception might happen when trying to commit the transaction. For example we might get an immediate exception
/// because ZK is down and wait_changes_become_visible_after_commit_mode == WAIT_UNKNOWN
implicit_txn_control.reset();
throw;
}
}
}
@ -945,27 +961,11 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
query_span->addAttributeIfNotEmpty("clickhouse.tracestate", OpenTelemetry::CurrentContext().tracestate);
query_span->addAttributeIfNotZero("clickhouse.read_rows", elem.read_rows);
query_span->addAttributeIfNotZero("clickhouse.read_bytes", elem.read_bytes);
query_span->addAttributeIfNotZero("clickhouse.written_rows", info.written_rows);
query_span->addAttributeIfNotZero("clickhouse.written_rows", elem.written_rows);
query_span->addAttributeIfNotZero("clickhouse.written_bytes", elem.written_bytes);
query_span->addAttributeIfNotZero("clickhouse.memory_usage", elem.memory_usage);
query_span->finish();
}
if (implicit_txn_control)
{
try
{
implicit_txn_control->executeCommit(context->getSessionContext());
implicit_txn_control.reset();
}
catch (const Exception &)
{
/// An exception might happen when trying to commit the transaction. For example we might get an immediate exception
/// because ZK is down and wait_changes_become_visible_after_commit_mode == WAIT_UNKNOWN
implicit_txn_control.reset();
throw;
}
}
};
auto exception_callback = [elem,

View File

@ -1,5 +1,6 @@
#pragma once
#include <Processors/Formats/InputFormatErrorsLogger.h>
#include <Processors/ISource.h>
#include <IO/ReadBuffer.h>
#include <Interpreters/Context.h>
@ -55,9 +56,13 @@ public:
void addBuffer(std::unique_ptr<ReadBuffer> buffer) { owned_buffers.emplace_back(std::move(buffer)); }
void setErrorsLogger(const InputFormatErrorsLoggerPtr & errors_logger_) { errors_logger = errors_logger_; }
protected:
ColumnMappingPtr column_mapping{};
InputFormatErrorsLoggerPtr errors_logger;
private:
/// Number of currently parsed chunk (if parallel parsing is enabled)
size_t current_unit_number = 0;

View File

@ -52,6 +52,31 @@ IRowInputFormat::IRowInputFormat(Block header, ReadBuffer & in_, Params params_)
{
}
void IRowInputFormat::logError()
{
String diagnostic;
String raw_data;
try
{
std::tie(diagnostic, raw_data) = getDiagnosticAndRawData();
}
catch (const Exception & exception)
{
diagnostic = "Cannot get diagnostic: " + exception.message();
raw_data = "Cannot get raw data: " + exception.message();
}
catch (...)
{
/// Error while trying to obtain verbose diagnostic. Ok to ignore.
}
trimLeft(diagnostic, '\n');
trimRight(diagnostic, '\n');
auto now_time = time(nullptr);
errors_logger->logError(InputFormatErrorsLogger::ErrorEntry{now_time, total_rows, diagnostic, raw_data});
}
Chunk IRowInputFormat::generate()
{
if (total_rows == 0)
@ -112,6 +137,9 @@ Chunk IRowInputFormat::generate()
if (params.allow_errors_num == 0 && params.allow_errors_ratio == 0)
throw;
if (errors_logger)
logError();
++num_errors;
Float64 current_error_ratio = static_cast<Float64>(num_errors) / total_rows;

View File

@ -65,6 +65,10 @@ protected:
/// and collect as much as possible diagnostic information about error.
/// If not implemented, returns empty string.
virtual std::string getDiagnosticInfo() { return {}; }
/// Get diagnostic info and raw data for a row
virtual std::pair<std::string, std::string> getDiagnosticAndRawData() { return std::make_pair("", ""); }
void logError();
const BlockMissingValues & getMissingValues() const override { return block_missing_values; }

View File

@ -235,8 +235,10 @@ static void insertNull(IColumn & column, DataTypePtr type)
assert_cast<ColumnNullable &>(column).insertDefault();
}
static void insertUUID(IColumn & column, DataTypePtr /*type*/, const char * value, size_t size)
static void insertUUID(IColumn & column, DataTypePtr type, const char * value, size_t size)
{
if (!isUUID(type))
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Cannot insert MessagePack UUID into column with type {}.", type->getName());
ReadBufferFromMemory buf(value, size);
UUID uuid;
readBinaryBigEndian(uuid.toUnderType().items[0], buf);

View File

@ -75,6 +75,7 @@ void ParallelParsingInputFormat::parserThreadFunction(ThreadGroupStatusPtr threa
InputFormatPtr input_format = internal_parser_creator(read_buffer);
input_format->setCurrentUnitNumber(current_ticket_number);
input_format->setErrorsLogger(errors_logger);
InternalParser parser(input_format);
unit.chunk_ext.chunk.clear();

View File

@ -0,0 +1,83 @@
#include <Processors/Formats/InputFormatErrorsLogger.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <IO/WriteHelpers.h>
#include <Processors/Formats/IRowOutputFormat.h>
namespace DB
{
namespace
{
const String DEFAULT_OUTPUT_FORMAT = "CSV";
}
InputFormatErrorsLogger::InputFormatErrorsLogger(const ContextPtr & context)
{
String output_format = context->getSettingsRef().errors_output_format;
if (!FormatFactory::instance().isOutputFormat(output_format))
output_format = DEFAULT_OUTPUT_FORMAT;
if (context->hasInsertionTable())
table = context->getInsertionTable().getTableName();
if (context->getInsertionTable().hasDatabase())
database = context->getInsertionTable().getDatabaseName();
String path_in_setting = context->getSettingsRef().input_format_record_errors_file_path;
errors_file_path = context->getApplicationType() == Context::ApplicationType::SERVER ? context->getUserFilesPath() + path_in_setting
: path_in_setting;
while (fs::exists(errors_file_path))
{
errors_file_path += "_new";
}
write_buf = std::make_shared<WriteBufferFromFile>(errors_file_path);
header = Block{
{std::make_shared<DataTypeDateTime>(), "time"},
{std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>()), "database"},
{std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>()), "table"},
{std::make_shared<DataTypeUInt32>(), "offset"},
{std::make_shared<DataTypeString>(), "reason"},
{std::make_shared<DataTypeString>(), "raw_data"}};
writer = context->getOutputFormat(output_format, *write_buf, header);
}
InputFormatErrorsLogger::~InputFormatErrorsLogger()
{
writer->finalize();
writer->flush();
write_buf->finalize();
}
void InputFormatErrorsLogger::logErrorImpl(ErrorEntry entry)
{
auto error = header.cloneEmpty();
auto columns = error.mutateColumns();
columns[0]->insert(entry.time);
database.empty() ? columns[1]->insertDefault() : columns[1]->insert(database);
table.empty() ? columns[2]->insertDefault() : columns[2]->insert(table);
columns[3]->insert(entry.offset);
columns[4]->insert(entry.reason);
columns[5]->insert(entry.raw_data);
error.setColumns(std::move(columns));
writer->write(error);
}
void InputFormatErrorsLogger::logError(ErrorEntry entry)
{
logErrorImpl(entry);
}
ParallelInputFormatErrorsLogger::~ParallelInputFormatErrorsLogger() = default;
void ParallelInputFormatErrorsLogger::logError(ErrorEntry entry)
{
std::lock_guard lock(write_mutex);
logErrorImpl(entry);
}
}

View File

@ -0,0 +1,54 @@
#pragma once
#include <IO/WriteBufferFromFile.h>
#include <Interpreters/Context.h>
namespace DB
{
class InputFormatErrorsLogger
{
public:
struct ErrorEntry
{
time_t time;
size_t offset;
String reason;
String raw_data;
};
InputFormatErrorsLogger(const ContextPtr & context);
virtual ~InputFormatErrorsLogger();
virtual void logError(ErrorEntry entry);
void logErrorImpl(ErrorEntry entry);
private:
Block header;
String errors_file_path;
std::shared_ptr<WriteBufferFromFile> write_buf;
OutputFormatPtr writer;
String database;
String table;
};
using InputFormatErrorsLoggerPtr = std::shared_ptr<InputFormatErrorsLogger>;
class ParallelInputFormatErrorsLogger : public InputFormatErrorsLogger
{
public:
ParallelInputFormatErrorsLogger(const ContextPtr & context) : InputFormatErrorsLogger(context) { }
~ParallelInputFormatErrorsLogger() override;
void logError(ErrorEntry entry) override;
private:
std::mutex write_mutex;
};
}

View File

@ -35,12 +35,15 @@ void RowInputFormatWithDiagnosticInfo::updateDiagnosticInfo()
offset_of_current_row = in->offset();
}
String RowInputFormatWithDiagnosticInfo::getDiagnosticInfo()
std::pair<String, String> RowInputFormatWithDiagnosticInfo::getDiagnosticAndRawDataImpl(bool is_errors_record)
{
if (in->eof())
return "Buffer has gone, cannot extract information about what has been parsed.";
WriteBufferFromOwnString out_diag;
WriteBufferFromOwnString out_data;
WriteBufferFromOwnString out;
if (in->eof())
return std::make_pair(
"Buffer has gone, cannot extract information about what has been parsed.",
"Buffer has gone, cannot extract information about what has been parsed.");
const auto & header = getPort().getHeader();
MutableColumns columns = header.cloneEmptyColumns();
@ -49,8 +52,9 @@ String RowInputFormatWithDiagnosticInfo::getDiagnosticInfo()
size_t bytes_read_at_start_of_buffer = in->count() - in->offset();
if (bytes_read_at_start_of_buffer != bytes_read_at_start_of_buffer_on_prev_row)
{
out << "Could not print diagnostic info because two last rows aren't in buffer (rare case)\n";
return out.str();
out_diag << "Could not print diagnostic info because two last rows aren't in buffer (rare case)\n";
out_data << "Could not collect raw data because two last rows aren't in buffer (rare case)\n";
return std::make_pair(out_diag.str(), out_data.str());
}
max_length_of_column_name = 0;
@ -65,30 +69,49 @@ String RowInputFormatWithDiagnosticInfo::getDiagnosticInfo()
/// Roll back the cursor to the beginning of the previous or current row and parse all over again. But now we derive detailed information.
if (offset_of_prev_row <= in->buffer().size())
if (!is_errors_record && offset_of_prev_row <= in->buffer().size())
{
in->position() = in->buffer().begin() + offset_of_prev_row;
out << "\nRow " << (row_num - 1) << ":\n";
if (!parseRowAndPrintDiagnosticInfo(columns, out))
return out.str();
out_diag << "\nRow " << (row_num - 1) << ":\n";
if (!parseRowAndPrintDiagnosticInfo(columns, out_diag))
return std::make_pair(out_diag.str(), out_data.str());
}
else
{
if (in->buffer().size() < offset_of_current_row)
{
out << "Could not print diagnostic info because parsing of data hasn't started.\n";
return out.str();
out_diag << "Could not print diagnostic info because parsing of data hasn't started.\n";
out_data << "Could not collect raw data because parsing of data hasn't started.\n";
return std::make_pair(out_diag.str(), out_data.str());
}
in->position() = in->buffer().begin() + offset_of_current_row;
}
out << "\nRow " << row_num << ":\n";
parseRowAndPrintDiagnosticInfo(columns, out);
out << "\n";
char * data = in->position();
while (data < in->buffer().end() && *data != '\n' && *data != '\r' && *data != '\0')
{
out_data << *data;
++data;
}
return out.str();
out_diag << "\nRow " << row_num << ":\n";
parseRowAndPrintDiagnosticInfo(columns, out_diag);
out_diag << "\n";
return std::make_pair(out_diag.str(), out_data.str());
}
String RowInputFormatWithDiagnosticInfo::getDiagnosticInfo()
{
auto diagnostic_and_raw_data = getDiagnosticAndRawDataImpl(false);
return std::get<0>(diagnostic_and_raw_data);
}
std::pair<String, String> RowInputFormatWithDiagnosticInfo::getDiagnosticAndRawData()
{
return getDiagnosticAndRawDataImpl(true);
}
bool RowInputFormatWithDiagnosticInfo::deserializeFieldAndPrintDiagnosticInfo(const String & col_name,

View File

@ -14,7 +14,9 @@ class RowInputFormatWithDiagnosticInfo : public IRowInputFormat
public:
RowInputFormatWithDiagnosticInfo(const Block & header_, ReadBuffer & in_, const Params & params_);
std::pair<String, String> getDiagnosticAndRawDataImpl(bool is_errors_record);
String getDiagnosticInfo() override;
std::pair<String, String> getDiagnosticAndRawData() override;
void resetParser() override;

View File

@ -140,6 +140,11 @@ namespace
size_t rows = 0;
size_t bytes = 0;
UInt32 shard_num = 0;
std::string cluster;
std::string distributed_table;
std::string remote_table;
/// dumpStructure() of the header -- obsolete
std::string block_header_string;
Block block_header;
@ -195,6 +200,14 @@ namespace
in.getFileName(), distributed_header.revision, DBMS_TCP_PROTOCOL_VERSION);
}
if (header_buf.hasPendingData())
{
readVarUInt(distributed_header.shard_num, header_buf);
readStringBinary(distributed_header.cluster, header_buf);
readStringBinary(distributed_header.distributed_table, header_buf);
readStringBinary(distributed_header.remote_table, header_buf);
}
/// Add handling new data here, for example:
///
/// if (header_buf.hasPendingData())
@ -621,18 +634,23 @@ void StorageDistributedDirectoryMonitor::processFile(const std::string & file_pa
ReadBufferFromFile in(file_path);
const auto & distributed_header = readDistributedHeader(in, log);
auto connection = pool->get(timeouts, &distributed_header.insert_settings);
thread_trace_context = std::make_unique<OpenTelemetry::TracingContextHolder>(__PRETTY_FUNCTION__,
distributed_header.client_info.client_trace_context,
this->storage.getContext()->getOpenTelemetrySpanLog());
thread_trace_context->root_span.addAttribute("clickhouse.shard_num", distributed_header.shard_num);
thread_trace_context->root_span.addAttribute("clickhouse.cluster", distributed_header.cluster);
thread_trace_context->root_span.addAttribute("clickhouse.distributed", distributed_header.distributed_table);
thread_trace_context->root_span.addAttribute("clickhouse.remote", distributed_header.remote_table);
thread_trace_context->root_span.addAttribute("clickhouse.rows", distributed_header.rows);
thread_trace_context->root_span.addAttribute("clickhouse.bytes", distributed_header.bytes);
auto connection = pool->get(timeouts, &distributed_header.insert_settings);
LOG_DEBUG(log, "Sending `{}` to {} ({} rows, {} bytes)",
file_path,
connection->getDescription(),
formatReadableQuantity(distributed_header.rows),
formatReadableSizeWithBinarySuffix(distributed_header.bytes));
thread_trace_context = std::make_unique<OpenTelemetry::TracingContextHolder>(__PRETTY_FUNCTION__,
distributed_header.client_info.client_trace_context,
this->storage.getContext()->getOpenTelemetrySpanLog());
RemoteInserter remote{*connection, timeouts,
distributed_header.insert_query,
distributed_header.insert_settings,

View File

@ -171,7 +171,6 @@ void DistributedSink::writeAsync(const Block & block)
}
else
{
if (storage.getShardingKeyExpr() && (cluster->getShardsInfo().size() > 1))
return writeSplitAsync(block);
@ -291,6 +290,8 @@ DistributedSink::runWritingJob(JobReplica & job, const Block & current_block, si
auto thread_group = CurrentThread::getGroup();
return [this, thread_group, &job, &current_block, num_shards]()
{
OpenTelemetry::SpanHolder span(__PRETTY_FUNCTION__);
if (thread_group)
CurrentThread::attachToIfDetached(thread_group);
setThreadName("DistrOutStrProc");
@ -331,15 +332,19 @@ DistributedSink::runWritingJob(JobReplica & job, const Block & current_block, si
const Block & shard_block = (num_shards > 1) ? job.current_shard_block : current_block;
const Settings & settings = context->getSettingsRef();
/// Do not initiate INSERT for empty block.
size_t rows = shard_block.rows();
span.addAttribute("clickhouse.shard_num", shard_info.shard_num);
span.addAttribute("clickhouse.cluster", this->storage.cluster_name);
span.addAttribute("clickhouse.distributed", this->storage.getStorageID().getFullNameNotQuoted());
span.addAttribute("clickhouse.remote", [this]() { return storage.remote_database + "." + storage.remote_table; });
span.addAttribute("clickhouse.rows", rows);
span.addAttribute("clickhouse.bytes", [&shard_block]() { return toString(shard_block.bytes()); });
/// Do not initiate INSERT for empty block.
if (rows == 0)
return;
OpenTelemetry::SpanHolder span(__PRETTY_FUNCTION__);
span.addAttribute("clickhouse.shard_num", shard_info.shard_num);
span.addAttribute("clickhouse.written_rows", rows);
if (!job.is_local_job || !settings.prefer_localhost_replica)
{
if (!job.executor)
@ -610,20 +615,15 @@ void DistributedSink::writeSplitAsync(const Block & block)
void DistributedSink::writeAsyncImpl(const Block & block, size_t shard_id)
{
OpenTelemetry::SpanHolder span("DistributedSink::writeAsyncImpl()");
const auto & shard_info = cluster->getShardsInfo()[shard_id];
const auto & settings = context->getSettingsRef();
Block block_to_send = removeSuperfluousColumns(block);
span.addAttribute("clickhouse.shard_num", shard_info.shard_num);
span.addAttribute("clickhouse.written_rows", block.rows());
if (shard_info.hasInternalReplication())
{
if (shard_info.isLocal() && settings.prefer_localhost_replica)
/// Prefer insert into current instance directly
writeToLocal(block_to_send, shard_info.getLocalNodeCount());
writeToLocal(shard_info, block_to_send, shard_info.getLocalNodeCount());
else
{
const auto & path = shard_info.insertPathForInternalReplication(
@ -631,13 +631,13 @@ void DistributedSink::writeAsyncImpl(const Block & block, size_t shard_id)
settings.use_compact_format_in_distributed_parts_names);
if (path.empty())
throw Exception("Directory name for async inserts is empty", ErrorCodes::LOGICAL_ERROR);
writeToShard(block_to_send, {path});
writeToShard(shard_info, block_to_send, {path});
}
}
else
{
if (shard_info.isLocal() && settings.prefer_localhost_replica)
writeToLocal(block_to_send, shard_info.getLocalNodeCount());
writeToLocal(shard_info, block_to_send, shard_info.getLocalNodeCount());
std::vector<std::string> dir_names;
for (const auto & address : cluster->getShardsAddresses()[shard_id])
@ -645,30 +645,44 @@ void DistributedSink::writeAsyncImpl(const Block & block, size_t shard_id)
dir_names.push_back(address.toFullString(settings.use_compact_format_in_distributed_parts_names));
if (!dir_names.empty())
writeToShard(block_to_send, dir_names);
writeToShard(shard_info, block_to_send, dir_names);
}
}
void DistributedSink::writeToLocal(const Block & block, size_t repeats)
void DistributedSink::writeToLocal(const Cluster::ShardInfo & shard_info, const Block & block, size_t repeats)
{
OpenTelemetry::SpanHolder span(__PRETTY_FUNCTION__);
span.addAttribute("db.statement", this->query_string);
span.addAttribute("clickhouse.shard_num", shard_info.shard_num);
span.addAttribute("clickhouse.cluster", this->storage.cluster_name);
span.addAttribute("clickhouse.distributed", this->storage.getStorageID().getFullNameNotQuoted());
span.addAttribute("clickhouse.remote", [this]() { return storage.remote_database + "." + storage.remote_table; });
span.addAttribute("clickhouse.rows", [&block]() { return toString(block.rows()); });
span.addAttribute("clickhouse.bytes", [&block]() { return toString(block.bytes()); });
InterpreterInsertQuery interp(query_ast, context, allow_materialized);
try
{
InterpreterInsertQuery interp(query_ast, context, allow_materialized);
auto block_io = interp.execute();
PushingPipelineExecutor executor(block_io.pipeline);
auto block_io = interp.execute();
PushingPipelineExecutor executor(block_io.pipeline);
executor.start();
writeBlockConvert(executor, block, repeats, log);
executor.finish();
executor.start();
writeBlockConvert(executor, block, repeats, log);
executor.finish();
}
catch (...)
{
span.addAttribute(std::current_exception());
throw;
}
}
void DistributedSink::writeToShard(const Block & block, const std::vector<std::string> & dir_names)
void DistributedSink::writeToShard(const Cluster::ShardInfo & shard_info, const Block & block, const std::vector<std::string> & dir_names)
{
OpenTelemetry::SpanHolder span(__PRETTY_FUNCTION__);
span.addAttribute("clickhouse.shard_num", shard_info.shard_num);
const auto & settings = context->getSettingsRef();
const auto & distributed_settings = storage.getDistributedSettingsRef();
@ -759,6 +773,11 @@ void DistributedSink::writeToShard(const Block & block, const std::vector<std::s
header_stream.write(block.cloneEmpty());
}
writeVarUInt(shard_info.shard_num, header_buf);
writeStringBinary(this->storage.cluster_name, header_buf);
writeStringBinary(this->storage.getStorageID().getFullNameNotQuoted(), header_buf);
writeStringBinary(this->storage.remote_database + "." + this->storage.remote_table, header_buf);
/// Add new fields here, for example:
/// writeVarUInt(my_new_data, header_buf);
/// And note that it is safe, because we have checksum and size for header.

View File

@ -69,9 +69,9 @@ private:
Block removeSuperfluousColumns(Block block) const;
/// Increments finished_writings_count after each repeat.
void writeToLocal(const Block & block, size_t repeats);
void writeToLocal(const Cluster::ShardInfo & shard_info, const Block & block, size_t repeats);
void writeToShard(const Block & block, const std::vector<std::string> & dir_names);
void writeToShard(const Cluster::ShardInfo & shard_info, const Block & block, const std::vector<std::string> & dir_names);
/// Performs synchronous insertion to remote nodes. If timeout_exceeded flag was set, throws.

View File

@ -1242,6 +1242,9 @@ protected:
/// Attaches restored parts to the storage.
virtual void attachRestoredParts(MutableDataPartsVector && parts) = 0;
void resetObjectColumnsFromActiveParts(const DataPartsLock & lock);
void updateObjectColumns(const DataPartPtr & part, const DataPartsLock & lock);
static void incrementInsertedPartsProfileEvent(MergeTreeDataPartType type);
static void incrementMergedPartsProfileEvent(MergeTreeDataPartType type);
@ -1329,9 +1332,6 @@ private:
DataPartsVector & duplicate_parts_to_remove,
MutableDataPartsVector & parts_from_wal);
void resetObjectColumnsFromActiveParts(const DataPartsLock & lock);
void updateObjectColumns(const DataPartPtr & part, const DataPartsLock & lock);
/// Create zero-copy exclusive lock for part and disk. Useful for coordination of
/// distributed operations which can lead to data duplication. Implemented only in ReplicatedMergeTree.
virtual std::optional<ZeroCopyLock> tryCreateZeroCopyExclusiveLock(const String &, const DiskPtr &) { return std::nullopt; }

View File

@ -483,16 +483,6 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeTempPart(
return temp_part;
}
void MergeTreeDataWriter::deduceTypesOfObjectColumns(const StorageSnapshotPtr & storage_snapshot, Block & block)
{
if (!storage_snapshot->object_columns.empty())
{
auto options = GetColumnsOptions(GetColumnsOptions::AllPhysical).withExtendedObjects();
auto storage_columns = storage_snapshot->getColumns(options);
convertObjectsToTuples(block, storage_columns);
}
}
MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeProjectionPartImpl(
const String & part_name,
MergeTreeDataPartType part_type,

View File

@ -45,8 +45,6 @@ public:
*/
static BlocksWithPartition splitBlockIntoParts(const Block & block, size_t max_parts, const StorageMetadataPtr & metadata_snapshot, ContextPtr context);
static void deduceTypesOfObjectColumns(const StorageSnapshotPtr & storage_snapshot, Block & block);
/// This structure contains not completely written temporary part.
/// Some writes may happen asynchronously, e.g. for blob storages.
/// You should call finalize() to wait until all data is written.

View File

@ -1,6 +1,7 @@
#include <Storages/MergeTree/MergeTreeSink.h>
#include <Storages/MergeTree/MergeTreeDataPartInMemory.h>
#include <Storages/StorageMergeTree.h>
#include <DataTypes/ObjectUtils.h>
#include <Interpreters/PartLog.h>
namespace ProfileEvents
@ -56,7 +57,7 @@ void MergeTreeSink::consume(Chunk chunk)
{
auto block = getHeader().cloneWithColumns(chunk.detachColumns());
storage.writer.deduceTypesOfObjectColumns(storage_snapshot, block);
deduceTypesOfObjectColumns(storage_snapshot, block);
auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block, metadata_snapshot, context);
using DelayedPartitions = std::vector<MergeTreeSink::DelayedChunk::Partition>;

View File

@ -1,6 +1,7 @@
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/MergeTree/ReplicatedMergeTreeQuorumEntry.h>
#include <Storages/MergeTree/ReplicatedMergeTreeSink.h>
#include <DataTypes/ObjectUtils.h>
#include <Interpreters/PartLog.h>
#include <Common/SipHash.h>
#include <Common/ZooKeeper/KeeperException.h>
@ -161,7 +162,7 @@ void ReplicatedMergeTreeSink::consume(Chunk chunk)
*/
size_t replicas_num = checkQuorumPrecondition(zookeeper);
storage.writer.deduceTypesOfObjectColumns(storage_snapshot, block);
deduceTypesOfObjectColumns(storage_snapshot, block);
auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block, metadata_snapshot, context);
using DelayedPartitions = std::vector<ReplicatedMergeTreeSink::DelayedChunk::Partition>;
@ -203,11 +204,11 @@ void ReplicatedMergeTreeSink::consume(Chunk chunk)
}
block_id = temp_part.part->getZeroLevelPartBlockID(block_dedup_token);
LOG_DEBUG(log, "Wrote block with ID '{}', {} rows on {} replicas", block_id, current_block.block.rows(), replicas_num);
LOG_DEBUG(log, "Wrote block with ID '{}', {} rows{}", block_id, current_block.block.rows(), quorumLogMessage(replicas_num));
}
else
{
LOG_DEBUG(log, "Wrote block with {} rows on {} replicas", current_block.block.rows(), replicas_num);
LOG_DEBUG(log, "Wrote block with {} rows{}", current_block.block.rows(), quorumLogMessage(replicas_num));
}
UInt64 elapsed_ns = watch.elapsed();
@ -639,7 +640,7 @@ void ReplicatedMergeTreeSink::waitForQuorum(
size_t replicas_num) const
{
/// We are waiting for quorum to be satisfied.
LOG_TRACE(log, "Waiting for quorum '{}' for part {} on {} replicas", quorum_path, part_name, replicas_num);
LOG_TRACE(log, "Waiting for quorum '{}' for part {}{}", quorum_path, part_name, quorumLogMessage(replicas_num));
try
{
@ -684,6 +685,13 @@ void ReplicatedMergeTreeSink::waitForQuorum(
LOG_TRACE(log, "Quorum '{}' for part {} satisfied", quorum_path, part_name);
}
String ReplicatedMergeTreeSink::quorumLogMessage(size_t replicas_num) const
{
if (!isQuorumEnabled())
return "";
return fmt::format(" (quorum {} of {} replicas)", getQuorumSize(replicas_num), replicas_num);
}
size_t ReplicatedMergeTreeSink::getQuorumSize(size_t replicas_num) const
{
if (!isQuorumEnabled())

View File

@ -96,6 +96,7 @@ private:
size_t getQuorumSize(size_t replicas_num) const;
bool isQuorumEnabled() const;
String quorumLogMessage(size_t replicas_num) const; /// Used in logs for debug purposes
size_t quorum_timeout_ms;
size_t max_parts_per_block;

View File

@ -335,6 +335,13 @@ void StorageMergeTree::alter(
mutation_version = startMutation(maybe_mutation_commands, local_context);
}
{
/// Reset Object columns, because column of type
/// Object may be added or dropped by alter.
auto parts_lock = lockParts();
resetObjectColumnsFromActiveParts(parts_lock);
}
/// Always execute required mutations synchronously, because alters
/// should be executed in sequential order.
if (!maybe_mutation_commands.empty())

View File

@ -3649,7 +3649,7 @@ void StorageReplicatedMergeTree::updateQuorum(const String & part_name, bool is_
if (quorum_entry.replicas.size() >= quorum_entry.required_number_of_replicas)
{
/// The quorum is reached. Delete the node, and update information about the last part that was successfully written with quorum.
LOG_TRACE(log, "Got {} (of {}) replicas confirmed quorum {}, going to remove node",
LOG_TRACE(log, "Got {} (of {} required) replicas confirmed quorum {}, going to remove node",
quorum_entry.replicas.size(), quorum_entry.required_number_of_replicas, quorum_status_path);
Coordination::Requests ops;
@ -4649,6 +4649,13 @@ bool StorageReplicatedMergeTree::executeMetadataAlter(const StorageReplicatedMer
LOG_INFO(log, "Applied changes to the metadata of the table. Current metadata version: {}", metadata_version);
}
{
/// Reset Object columns, because column of type
/// Object may be added or dropped by alter.
auto parts_lock = lockParts();
resetObjectColumnsFromActiveParts(parts_lock);
}
/// This transaction may not happen, but it's OK, because on the next retry we will eventually create/update this node
/// TODO Maybe do in in one transaction for Replicated database?
zookeeper->createOrUpdate(fs::path(replica_path) / "metadata_version", std::to_string(metadata_version), zkutil::CreateMode::Persistent);

View File

@ -350,7 +350,8 @@ if __name__ == "__main__":
# randomizer, we should remove it after Sep 2022
try:
subprocess.check_call(
f"docker volume ls -q | grep '{VOLUME_NAME}_.*_volume' | xargs --no-run-if-empty docker volume rm",
"docker volume rm $(docker volume ls -q | "
f"grep '{VOLUME_NAME}_.*_volume')",
shell=True,
)
except Exception as ex:

View File

@ -27,7 +27,7 @@ def cluster():
def assert_objects_count(cluster, objects_count, path="data/"):
minio = cluster.minio_client
s3_objects = list(minio.list_objects(cluster.minio_bucket, path, recursive=True))
s3_objects = list(minio.list_objects(cluster.minio_bucket, path))
if objects_count != len(s3_objects):
for s3_object in s3_objects:
object_meta = minio.stat_object(cluster.minio_bucket, s3_object.object_name)

View File

@ -1,4 +1,4 @@
<yandex>
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>1</server_id>
@ -19,9 +19,19 @@
<id>1</id>
<hostname>node1</hostname>
<port>9234</port>
<can_become_leader>true</can_become_leader>
<priority>3</priority>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
</raft_configuration>
</keeper_server>
</yandex>
</clickhouse>

View File

@ -0,0 +1,37 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>2</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<four_letter_word_white_list>*</four_letter_word_white_list>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<min_session_timeout_ms>5000</min_session_timeout_ms>
<snapshot_distance>75</snapshot_distance>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>9234</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,37 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>3</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<four_letter_word_white_list>*</four_letter_word_white_list>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<min_session_timeout_ms>5000</min_session_timeout_ms>
<snapshot_distance>75</snapshot_distance>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>9234</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -10,7 +10,15 @@ from kazoo.client import KazooClient
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance(
"node1", main_configs=["configs/keeper_config.xml"], stay_alive=True
"node1", main_configs=["configs/keeper_config1.xml"], stay_alive=True
)
node2 = cluster.add_instance(
"node2", main_configs=["configs/keeper_config2.xml"], stay_alive=True
)
node3 = cluster.add_instance(
"node3", main_configs=["configs/keeper_config3.xml"], stay_alive=True
)
bool_struct = struct.Struct("B")
@ -61,7 +69,7 @@ def wait_node(node):
def wait_nodes():
for n in [node1]:
for n in [node1, node2, node3]:
wait_node(n)
@ -165,3 +173,21 @@ def test_session_timeout(started_cluster):
negotiated_timeout, _ = handshake(node1.name, session_timeout=20000, session_id=0)
assert negotiated_timeout == 10000
def test_session_close_shutdown(started_cluster):
wait_nodes()
node1_zk = get_fake_zk(node1.name)
node2_zk = get_fake_zk(node2.name)
eph_node = "/test_node"
node2_zk.create(eph_node, ephemeral=True)
assert node1_zk.exists(eph_node) != None
# shutdown while session is active
node2.stop_clickhouse()
assert node1_zk.exists(eph_node) == None
node2.start_clickhouse()

View File

@ -25,7 +25,7 @@ def cluster():
def assert_objects_count(cluster, objects_count, path="data/"):
minio = cluster.minio_client
s3_objects = list(minio.list_objects(cluster.minio_bucket, path, recursive=True))
s3_objects = list(minio.list_objects(cluster.minio_bucket, path))
if objects_count != len(s3_objects):
for s3_object in s3_objects:
object_meta = minio.stat_object(cluster.minio_bucket, s3_object.object_name)

View File

@ -38,6 +38,20 @@
<path>/jbod1/</path>
<max_size>1000000000</max_size>
</s3_with_cache_and_jbod>
<s3_r>
<type>s3</type>
<endpoint>http://minio1:9001/root/data/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
<s3_max_single_part_upload_size>33554432</s3_max_single_part_upload_size>
</s3_r>
<s3_cache_r>
<type>cache</type>
<disk>s3_r</disk>
<path>/s3_cache_r/</path>
<max_size>1000000000</max_size>
<do_not_evict_index_and_mark_files>1</do_not_evict_index_and_mark_files>
</s3_cache_r>
</disks>
<policies>
<s3>
@ -78,6 +92,13 @@
</main>
</volumes>
</s3_with_cache_and_jbod>
<s3_cache_r>
<volumes>
<main>
<disk>s3_cache_r</disk>
</main>
</volumes>
</s3_cache_r>
</policies>
</storage_configuration>

View File

@ -6,7 +6,6 @@ import pytest
from helpers.cluster import ClickHouseCluster
from helpers.utility import generate_values, replace_config, SafeThread
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
@ -36,6 +35,7 @@ def cluster():
"/jbod1:size=2M",
],
)
logging.info("Starting cluster...")
cluster.start()
logging.info("Cluster started")
@ -121,17 +121,11 @@ def run_s3_mocks(cluster):
def wait_for_delete_s3_objects(cluster, expected, timeout=30):
minio = cluster.minio_client
while timeout > 0:
if (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
== expected
):
if len(list(minio.list_objects(cluster.minio_bucket, "data/"))) == expected:
return
timeout -= 1
time.sleep(1)
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
== expected
)
assert len(list(minio.list_objects(cluster.minio_bucket, "data/"))) == expected
@pytest.fixture(autouse=True)
@ -147,9 +141,7 @@ def drop_table(cluster, node_name):
wait_for_delete_s3_objects(cluster, 0)
finally:
# Remove extra objects to prevent tests cascade failing
for obj in list(
minio.list_objects(cluster.minio_bucket, "data/", recursive=True)
):
for obj in list(minio.list_objects(cluster.minio_bucket, "data/")):
minio.remove_object(cluster.minio_bucket, obj.object_name)
@ -171,7 +163,7 @@ def test_simple_insert_select(
node.query("INSERT INTO s3_test VALUES {}".format(values1))
assert node.query("SELECT * FROM s3_test order by dt, id FORMAT Values") == values1
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
len(list(minio.list_objects(cluster.minio_bucket, "data/")))
== FILES_OVERHEAD + files_per_part
)
@ -182,7 +174,7 @@ def test_simple_insert_select(
== values1 + "," + values2
)
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
len(list(minio.list_objects(cluster.minio_bucket, "data/")))
== FILES_OVERHEAD + files_per_part * 2
)
@ -226,7 +218,7 @@ def test_insert_same_partition_and_merge(cluster, merge_vertical, node_name):
node.query("SELECT count(distinct(id)) FROM s3_test FORMAT Values") == "(8192)"
)
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
len(list(minio.list_objects(cluster.minio_bucket, "data/")))
== FILES_OVERHEAD_PER_PART_WIDE * 6 + FILES_OVERHEAD
)
@ -315,28 +307,28 @@ def test_attach_detach_partition(cluster, node_name):
)
assert node.query("SELECT count(*) FROM s3_test FORMAT Values") == "(8192)"
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
len(list(minio.list_objects(cluster.minio_bucket, "data/")))
== FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE * 2
)
node.query("ALTER TABLE s3_test DETACH PARTITION '2020-01-03'")
assert node.query("SELECT count(*) FROM s3_test FORMAT Values") == "(4096)"
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
len(list(minio.list_objects(cluster.minio_bucket, "data/")))
== FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE * 2
)
node.query("ALTER TABLE s3_test ATTACH PARTITION '2020-01-03'")
assert node.query("SELECT count(*) FROM s3_test FORMAT Values") == "(8192)"
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
len(list(minio.list_objects(cluster.minio_bucket, "data/")))
== FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE * 2
)
node.query("ALTER TABLE s3_test DROP PARTITION '2020-01-03'")
assert node.query("SELECT count(*) FROM s3_test FORMAT Values") == "(4096)"
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
len(list(minio.list_objects(cluster.minio_bucket, "data/")))
== FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE
)
@ -347,8 +339,7 @@ def test_attach_detach_partition(cluster, node_name):
)
assert node.query("SELECT count(*) FROM s3_test FORMAT Values") == "(0)"
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
== FILES_OVERHEAD
len(list(minio.list_objects(cluster.minio_bucket, "data/"))) == FILES_OVERHEAD
)
@ -366,21 +357,21 @@ def test_move_partition_to_another_disk(cluster, node_name):
)
assert node.query("SELECT count(*) FROM s3_test FORMAT Values") == "(8192)"
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
len(list(minio.list_objects(cluster.minio_bucket, "data/")))
== FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE * 2
)
node.query("ALTER TABLE s3_test MOVE PARTITION '2020-01-04' TO DISK 'hdd'")
assert node.query("SELECT count(*) FROM s3_test FORMAT Values") == "(8192)"
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
len(list(minio.list_objects(cluster.minio_bucket, "data/")))
== FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE
)
node.query("ALTER TABLE s3_test MOVE PARTITION '2020-01-04' TO DISK 's3'")
assert node.query("SELECT count(*) FROM s3_test FORMAT Values") == "(8192)"
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
len(list(minio.list_objects(cluster.minio_bucket, "data/")))
== FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE * 2
)
@ -401,7 +392,7 @@ def test_table_manipulations(cluster, node_name):
node.query("RENAME TABLE s3_test TO s3_renamed")
assert node.query("SELECT count(*) FROM s3_renamed FORMAT Values") == "(8192)"
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
len(list(minio.list_objects(cluster.minio_bucket, "data/")))
== FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE * 2
)
node.query("RENAME TABLE s3_renamed TO s3_test")
@ -412,15 +403,14 @@ def test_table_manipulations(cluster, node_name):
node.query("ATTACH TABLE s3_test")
assert node.query("SELECT count(*) FROM s3_test FORMAT Values") == "(8192)"
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
len(list(minio.list_objects(cluster.minio_bucket, "data/")))
== FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE * 2
)
node.query("TRUNCATE TABLE s3_test")
assert node.query("SELECT count(*) FROM s3_test FORMAT Values") == "(0)"
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
== FILES_OVERHEAD
len(list(minio.list_objects(cluster.minio_bucket, "data/"))) == FILES_OVERHEAD
)
@ -445,7 +435,7 @@ def test_move_replace_partition_to_another_table(cluster, node_name):
assert node.query("SELECT sum(id) FROM s3_test FORMAT Values") == "(0)"
assert node.query("SELECT count(*) FROM s3_test FORMAT Values") == "(16384)"
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
len(list(minio.list_objects(cluster.minio_bucket, "data/")))
== FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE * 4
)
@ -459,7 +449,7 @@ def test_move_replace_partition_to_another_table(cluster, node_name):
assert node.query("SELECT count(*) FROM s3_clone FORMAT Values") == "(8192)"
# Number of objects in S3 should be unchanged.
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
len(list(minio.list_objects(cluster.minio_bucket, "data/")))
== FILES_OVERHEAD * 2 + FILES_OVERHEAD_PER_PART_WIDE * 4
)
@ -473,7 +463,7 @@ def test_move_replace_partition_to_another_table(cluster, node_name):
assert node.query("SELECT sum(id) FROM s3_test FORMAT Values") == "(0)"
assert node.query("SELECT count(*) FROM s3_test FORMAT Values") == "(16384)"
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
len(list(minio.list_objects(cluster.minio_bucket, "data/")))
== FILES_OVERHEAD * 2 + FILES_OVERHEAD_PER_PART_WIDE * 6
)
@ -494,14 +484,14 @@ def test_move_replace_partition_to_another_table(cluster, node_name):
assert node.query("SELECT count(*) FROM s3_test FORMAT Values") == "(16384)"
# Data should remain in S3
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
len(list(minio.list_objects(cluster.minio_bucket, "data/")))
== FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE * 4
)
node.query("ALTER TABLE s3_test FREEZE")
# Number S3 objects should be unchanged.
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
len(list(minio.list_objects(cluster.minio_bucket, "data/")))
== FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE * 4
)
@ -510,7 +500,7 @@ def test_move_replace_partition_to_another_table(cluster, node_name):
wait_for_delete_s3_objects(cluster, FILES_OVERHEAD_PER_PART_WIDE * 4)
for obj in list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)):
for obj in list(minio.list_objects(cluster.minio_bucket, "data/")):
minio.remove_object(cluster.minio_bucket, obj.object_name)
@ -531,7 +521,7 @@ def test_freeze_unfreeze(cluster, node_name):
node.query("TRUNCATE TABLE s3_test")
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
len(list(minio.list_objects(cluster.minio_bucket, "data/")))
== FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE * 2
)
@ -546,8 +536,7 @@ def test_freeze_unfreeze(cluster, node_name):
# Data should be removed from S3.
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
== FILES_OVERHEAD
len(list(minio.list_objects(cluster.minio_bucket, "data/"))) == FILES_OVERHEAD
)
@ -570,7 +559,7 @@ def test_freeze_system_unfreeze(cluster, node_name):
node.query("TRUNCATE TABLE s3_test")
node.query("DROP TABLE s3_test_removed NO DELAY")
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
len(list(minio.list_objects(cluster.minio_bucket, "data/")))
== FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE * 2
)
@ -581,8 +570,7 @@ def test_freeze_system_unfreeze(cluster, node_name):
# Data should be removed from S3.
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
== FILES_OVERHEAD
len(list(minio.list_objects(cluster.minio_bucket, "data/"))) == FILES_OVERHEAD
)
@ -709,7 +697,7 @@ def test_lazy_seek_optimization_for_async_read(cluster, node_name):
node.query("SELECT * FROM s3_test WHERE value LIKE '%abc%' ORDER BY value LIMIT 10")
node.query("DROP TABLE IF EXISTS s3_test NO DELAY")
minio = cluster.minio_client
for obj in list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)):
for obj in list(minio.list_objects(cluster.minio_bucket, "data/")):
minio.remove_object(cluster.minio_bucket, obj.object_name)
@ -754,3 +742,79 @@ def test_store_cleanup_disk_s3(cluster, node_name):
"CREATE TABLE s3_test UUID '00000000-1000-4000-8000-000000000001' (n UInt64) Engine=MergeTree() ORDER BY n SETTINGS storage_policy='s3';"
)
node.query("INSERT INTO s3_test SELECT 1")
@pytest.mark.parametrize("node_name", ["node"])
def test_cache_setting_compatibility(cluster, node_name):
node = cluster.instances[node_name]
node.query("DROP TABLE IF EXISTS s3_test NO DELAY")
node.query(
"CREATE TABLE s3_test (key UInt32, value String) Engine=MergeTree() ORDER BY key SETTINGS storage_policy='s3_cache_r';"
)
node.query(
"INSERT INTO s3_test SELECT * FROM generateRandom('key UInt32, value String') LIMIT 500"
)
result = node.query("SYSTEM DROP FILESYSTEM CACHE")
result = node.query(
"SELECT count() FROM system.filesystem_cache WHERE cache_path LIKE '%persistent'"
)
assert int(result) == 0
node.query("SELECT * FROM s3_test")
result = node.query(
"SELECT count() FROM system.filesystem_cache WHERE cache_path LIKE '%persistent'"
)
assert int(result) > 0
config_path = os.path.join(
SCRIPT_DIR,
f"./{cluster.instances_dir_name}/node/configs/config.d/storage_conf.xml",
)
replace_config(
config_path,
"<do_not_evict_index_and_mark_files>1</do_not_evict_index_and_mark_files>",
"<do_not_evict_index_and_mark_files>0</do_not_evict_index_and_mark_files>",
)
result = node.query("DESCRIBE CACHE 's3_cache_r'")
assert result.strip().endswith("1")
node.restart_clickhouse()
result = node.query("DESCRIBE CACHE 's3_cache_r'")
assert result.strip().endswith("0")
result = node.query(
"SELECT count() FROM system.filesystem_cache WHERE cache_path LIKE '%persistent'"
)
assert int(result) > 0
node.query("SELECT * FROM s3_test FORMAT Null")
assert not node.contains_in_log("No such file or directory: Cache info:")
replace_config(
config_path,
"<do_not_evict_index_and_mark_files>0</do_not_evict_index_and_mark_files>",
"<do_not_evict_index_and_mark_files>1</do_not_evict_index_and_mark_files>",
)
result = node.query(
"SELECT count() FROM system.filesystem_cache WHERE cache_path LIKE '%persistent'"
)
assert int(result) > 0
node.restart_clickhouse()
result = node.query("DESCRIBE CACHE 's3_cache_r'")
assert result.strip().endswith("1")
node.query("SELECT * FROM s3_test FORMAT Null")
assert not node.contains_in_log("No such file or directory: Cache info:")

View File

@ -62,7 +62,7 @@ init_list = {
def get_s3_events(instance):
result = init_list.copy()
events = instance.query(
"SELECT event, value FROM system.events WHERE event LIKE '%S3%'"
"SELECT event,value FROM system.events WHERE event LIKE '%S3%'"
).split("\n")
for event in events:
ev = event.split("\t")
@ -85,20 +85,20 @@ def get_minio_stat(cluster):
)
).text.split("\n")
for line in stat:
x = re.search(r"s3_requests_total(\{.*\})?\s(\d+)(\s.*)?", line)
x = re.search("s3_requests_total(\{.*\})?\s(\d+)(\s.*)?", line)
if x != None:
y = re.search('.*api="(get|list|head|select).*', x.group(1))
if y != None:
result["get_requests"] += int(x.group(2))
else:
result["set_requests"] += int(x.group(2))
x = re.search(r"s3_errors_total(\{.*\})?\s(\d+)(\s.*)?", line)
x = re.search("s3_errors_total(\{.*\})?\s(\d+)(\s.*)?", line)
if x != None:
result["errors"] += int(x.group(2))
x = re.search(r"s3_rx_bytes_total(\{.*\})?\s([\d\.e\+\-]+)(\s.*)?", line)
x = re.search("s3_rx_bytes_total(\{.*\})?\s([\d\.e\+\-]+)(\s.*)?", line)
if x != None:
result["tx_bytes"] += float(x.group(2))
x = re.search(r"s3_tx_bytes_total(\{.*\})?\s([\d\.e\+\-]+)(\s.*)?", line)
x = re.search("s3_tx_bytes_total(\{.*\})?\s([\d\.e\+\-]+)(\s.*)?", line)
if x != None:
result["rx_bytes"] += float(x.group(2))
return result
@ -128,10 +128,8 @@ def get_query_stat(instance, hint):
def get_minio_size(cluster):
minio = cluster.minio_client
size = 0
for obj_level1 in minio.list_objects(
cluster.minio_bucket, prefix="data/", recursive=True
):
size += obj_level1.size
for obj in minio.list_objects(cluster.minio_bucket, "data/"):
size += obj.size
return size
@ -147,7 +145,7 @@ def test_profile_events(cluster):
metrics0 = get_s3_events(instance)
minio0 = get_minio_stat(cluster)
query1 = "CREATE TABLE test_s3.test_s3 (key UInt32, value UInt32) ENGINE=MergeTree PRIMARY KEY key ORDER BY key SETTINGS storage_policy = 's3'"
query1 = "CREATE TABLE test_s3.test_s3 (key UInt32, value UInt32) ENGINE=MergeTree PRIMARY KEY key ORDER BY key SETTINGS storage_policy='s3'"
instance.query(query1)
size1 = get_minio_size(cluster)
@ -169,7 +167,7 @@ def test_profile_events(cluster):
metrics1["WriteBufferFromS3Bytes"] - metrics0["WriteBufferFromS3Bytes"] == size1
)
query2 = "INSERT INTO test_s3.test_s3 VALUES"
query2 = "INSERT INTO test_s3.test_s3 FORMAT Values"
instance.query(query2 + " (1,1)")
size2 = get_minio_size(cluster)
@ -184,12 +182,9 @@ def test_profile_events(cluster):
metrics2["S3WriteRequestsCount"] - metrics1["S3WriteRequestsCount"]
== minio2["set_requests"] - minio1["set_requests"]
)
stat2 = get_query_stat(instance, query2)
for metric in stat2:
assert stat2[metric] == metrics2[metric] - metrics1[metric]
assert (
metrics2["WriteBufferFromS3Bytes"] - metrics1["WriteBufferFromS3Bytes"]
== size2 - size1
@ -210,7 +205,6 @@ def test_profile_events(cluster):
== minio3["set_requests"] - minio2["set_requests"]
)
stat3 = get_query_stat(instance, query3)
# With async reads profile events are not updated fully because reads are done in a separate thread.
# for metric in stat3:
# print(metric)

View File

@ -113,7 +113,7 @@ def drop_table(cluster):
minio = cluster.minio_client
# Remove extra objects to prevent tests cascade failing
for obj in list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)):
for obj in list(minio.list_objects(cluster.minio_bucket, "data/")):
minio.remove_object(cluster.minio_bucket, obj.object_name)
@ -130,9 +130,9 @@ def test_insert_select_replicated(cluster, min_rows_for_wide_part, files_per_par
insert(cluster, node_idxs=[1, 2, 3], verify=True)
minio = cluster.minio_client
assert len(
list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True))
) == 3 * (FILES_OVERHEAD + files_per_part * 3)
assert len(list(minio.list_objects(cluster.minio_bucket, "data/"))) == 3 * (
FILES_OVERHEAD + files_per_part * 3
)
def test_drop_cache_on_cluster(cluster):

View File

@ -87,7 +87,7 @@ def drop_table(cluster):
minio = cluster.minio_client
# Remove extra objects to prevent tests cascade failing
for obj in list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)):
for obj in list(minio.list_objects(cluster.minio_bucket, "data/")):
minio.remove_object(cluster.minio_bucket, obj.object_name)
@ -124,6 +124,6 @@ def test_insert_select_replicated(cluster, min_rows_for_wide_part, files_per_par
)
minio = cluster.minio_client
assert len(
list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True))
) == (3 * FILES_OVERHEAD) + (files_per_part * 3)
assert len(list(minio.list_objects(cluster.minio_bucket, "data/"))) == (
3 * FILES_OVERHEAD
) + (files_per_part * 3)

View File

@ -39,9 +39,7 @@ def cluster():
def get_large_objects_count(cluster, size=100, folder="data"):
minio = cluster.minio_client
counter = 0
for obj in minio.list_objects(
cluster.minio_bucket, "{}/".format(folder), recursive=True
):
for obj in minio.list_objects(cluster.minio_bucket, "{}/".format(folder)):
if obj.size is not None and obj.size >= size:
counter = counter + 1
return counter

View File

@ -0,0 +1,6 @@
{% for storage in ["MergeTree", "ReplicatedMergeTree('/clickhouse/tables/{database}/test_01825_add_column/', 'r1')"] -%}
{"id":"1","s":{"k1":0}}
{"id":"2","s":{"k1":100}}
{"id":"1"}
{"id":"2"}
{% endfor -%}

View File

@ -0,0 +1,23 @@
-- Tags: no-fasttest
{% for storage in ["MergeTree", "ReplicatedMergeTree('/clickhouse/tables/{database}/test_01825_add_column/', 'r1')"] -%}
DROP TABLE IF EXISTS t_json_add_column;
SET allow_experimental_object_type = 1;
CREATE TABLE t_json_add_column (id UInt64) ENGINE = {{ storage }} ORDER BY tuple();
INSERT INTO t_json_add_column VALUES (1);
ALTER TABLE t_json_add_column ADD COLUMN s JSON;
INSERT INTO t_json_add_column VALUES(2, '{"k1": 100}');
SELECT * FROM t_json_add_column ORDER BY id FORMAT JSONEachRow;
ALTER TABLE t_json_add_column DROP COLUMN s;
SELECT * FROM t_json_add_column ORDER BY id FORMAT JSONEachRow;
DROP TABLE t_json_add_column;
{% endfor -%}

View File

@ -0,0 +1,21 @@
-- Regression test when Join stores data on disk and receive empty block.
-- Because of this it does not create empty file, while expect it.
SET max_threads = 1;
SET join_algorithm = 'auto';
SET max_rows_in_join = 1000;
SET optimize_aggregation_in_order = 1;
SET max_block_size = 1000;
DROP TABLE IF EXISTS join_on_disk;
SYSTEM STOP MERGES join_on_disk;
CREATE TABLE join_on_disk (id Int) Engine=MergeTree() ORDER BY id;
INSERT INTO join_on_disk SELECT number as id FROM numbers_mt(50000);
INSERT INTO join_on_disk SELECT number as id FROM numbers_mt(1000);
SELECT id FROM join_on_disk lhs LEFT JOIN (SELECT id FROM join_on_disk GROUP BY id) rhs USING (id) FORMAT Null;
DROP TABLE join_on_disk;

View File

@ -0,0 +1,8 @@
{"operation_name":"void DB::DistributedSink::writeToLocal(const Cluster::ShardInfo &, const DB::Block &, size_t)","cluster":"test_cluster_two_shards_localhost","shard":"1","rows":"1","bytes":"8"}
{"operation_name":"void DB::DistributedSink::writeToLocal(const Cluster::ShardInfo &, const DB::Block &, size_t)","cluster":"test_cluster_two_shards_localhost","shard":"2","rows":"1","bytes":"8"}
{"operation_name":"void DB::StorageDistributedDirectoryMonitor::processFile(const std::string &)","cluster":"test_cluster_two_shards_localhost","shard":"1","rows":"1","bytes":"8"}
{"operation_name":"void DB::StorageDistributedDirectoryMonitor::processFile(const std::string &)","cluster":"test_cluster_two_shards_localhost","shard":"2","rows":"1","bytes":"8"}
{"operation_name":"auto DB::DistributedSink::runWritingJob(DB::DistributedSink::JobReplica &, const DB::Block &, size_t)::(anonymous class)::operator()() const","cluster":"test_cluster_two_shards_localhost","shard":"1","rows":"1","bytes":"8"}
{"operation_name":"auto DB::DistributedSink::runWritingJob(DB::DistributedSink::JobReplica &, const DB::Block &, size_t)::(anonymous class)::operator()() const","cluster":"test_cluster_two_shards_localhost","shard":"2","rows":"1","bytes":"8"}
{"operation_name":"auto DB::DistributedSink::runWritingJob(DB::DistributedSink::JobReplica &, const DB::Block &, size_t)::(anonymous class)::operator()() const","cluster":"test_cluster_two_shards_localhost","shard":"1","rows":"1","bytes":"8"}
{"operation_name":"auto DB::DistributedSink::runWritingJob(DB::DistributedSink::JobReplica &, const DB::Block &, size_t)::(anonymous class)::operator()() const","cluster":"test_cluster_two_shards_localhost","shard":"2","rows":"1","bytes":"8"}

View File

@ -0,0 +1,91 @@
#!/usr/bin/env bash
# Tags: no-fasttest, distributed
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
# This function takes 4 arguments:
# $1 - OpenTelemetry Trace Id
# $2 - value of insert_distributed_sync
# $3 - value of prefer_localhost_replica
# $4 - a String that helps to debug
function insert()
{
echo "INSERT INTO ${CLICKHOUSE_DATABASE}.dist_opentelemetry SETTINGS insert_distributed_sync=$2, prefer_localhost_replica=$3 VALUES(1),(2)" |
${CLICKHOUSE_CURL} \
-X POST \
-H "traceparent: 00-$1-5150000000000515-01" \
-H "tracestate: $4" \
"${CLICKHOUSE_URL}" \
--data @-
}
function check_span()
{
${CLICKHOUSE_CLIENT} -nq "
SYSTEM FLUSH LOGS;
SELECT operation_name,
attribute['clickhouse.cluster'] AS cluster,
attribute['clickhouse.shard_num'] AS shard,
attribute['clickhouse.rows'] AS rows,
attribute['clickhouse.bytes'] AS bytes
FROM system.opentelemetry_span_log
WHERE finish_date >= yesterday()
AND lower(hex(trace_id)) = '${1}'
AND attribute['clickhouse.distributed'] = '${CLICKHOUSE_DATABASE}.dist_opentelemetry'
AND attribute['clickhouse.remote'] = '${CLICKHOUSE_DATABASE}.local_opentelemetry'
ORDER BY attribute['clickhouse.shard_num']
Format JSONEachRow
;"
}
#
# Prepare tables for tests
#
${CLICKHOUSE_CLIENT} -nq "
DROP TABLE IF EXISTS ${CLICKHOUSE_DATABASE}.dist_opentelemetry;
DROP TABLE IF EXISTS ${CLICKHOUSE_DATABASE}.local_opentelemetry;
CREATE TABLE ${CLICKHOUSE_DATABASE}.dist_opentelemetry (key UInt64) Engine=Distributed('test_cluster_two_shards_localhost', ${CLICKHOUSE_DATABASE}, local_opentelemetry, key % 2);
CREATE TABLE ${CLICKHOUSE_DATABASE}.local_opentelemetry (key UInt64) Engine=MergeTree ORDER BY key;
"
#
# test1
#
trace_id=$(${CLICKHOUSE_CLIENT} -q "select lower(hex(generateUUIDv4()))");
insert $trace_id 0 1 "async-insert-writeToLocal"
check_span $trace_id
#
# test2
#
trace_id=$(${CLICKHOUSE_CLIENT} -q "select lower(hex(generateUUIDv4()))");
insert $trace_id 0 0 "async-insert-writeToRemote"
check_span $trace_id
#
# test3
#
trace_id=$(${CLICKHOUSE_CLIENT} -q "select lower(hex(generateUUIDv4()))");
insert $trace_id 1 1 "sync-insert-writeToLocal"
check_span $trace_id
#
# test4
#
trace_id=$(${CLICKHOUSE_CLIENT} -q "select lower(hex(generateUUIDv4()))");
insert $trace_id 1 0 "sync-insert-writeToRemote"
check_span $trace_id
#
# Cleanup
#
${CLICKHOUSE_CLIENT} -nq "
DROP TABLE ${CLICKHOUSE_DATABASE}.dist_opentelemetry;
DROP TABLE ${CLICKHOUSE_DATABASE}.local_opentelemetry;
"

View File

@ -0,0 +1,6 @@
default data 2 Row 2:\nColumn 0, name: c1, type: UInt8, parsed text: "2"\nColumn 1, name: c2, type: UInt8, ERROR: text "a<LINE FEED>b,3<LINE FEED>4,4<LINE FEED>" is not like UInt8 2,a
default data 3 Row 3:\nColumn 0, name: c1, type: UInt8, ERROR: text "b,3<LINE FEED>4,4<LINE FEED>5," is not like UInt8 b,3
default data 5 Row 5:\nColumn 0, name: c1, type: UInt8, parsed text: "5"\nColumn 1, name: c2, type: UInt8, ERROR: text "c<LINE FEED>6,6<LINE FEED>" is not like UInt8 5,c
\N data 2 Row 2:\nColumn 0, name: A, type: UInt8, parsed text: "2"\nColumn 1, name: B, type: UInt8, ERROR: text "a<LINE FEED>b,3<LINE FEED>4,4<LINE FEED>" is not like UInt8 2,a
\N data 3 Row 3:\nColumn 0, name: A, type: UInt8, ERROR: text "b,3<LINE FEED>4,4<LINE FEED>5," is not like UInt8 b,3
\N data 5 Row 5:\nColumn 0, name: A, type: UInt8, parsed text: "5"\nColumn 1, name: B, type: UInt8, ERROR: text "c<LINE FEED>6,6<LINE FEED>" is not like UInt8 5,c

View File

@ -0,0 +1,35 @@
#!/usr/bin/env bash
# Tags: no-parallel, no-fasttest
set -eu
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
# Data preparation.
CLICKHOUSE_USER_FILES_PATH=$(clickhouse-client --query "select _path, _file from file('nonexist.txt', 'CSV', 'val1 char')" 2>&1 | grep Exception | awk '{gsub("/nonexist.txt","",$9); print $9}')
mkdir -p ${CLICKHOUSE_USER_FILES_PATH}/
echo -e "1,1\n2,a\nb,3\n4,4\n5,c\n6,6" > ${CLICKHOUSE_USER_FILES_PATH}/a.csv
${CLICKHOUSE_CLIENT} --query "drop table if exists data;"
${CLICKHOUSE_CLIENT} --query "create table data (A UInt8, B UInt8) engine=MergeTree() order by A;"
# Server side
${CLICKHOUSE_CLIENT} --input_format_allow_errors_num 4 --input_format_record_errors_file_path "errors_server" --query "insert into data select * from file('a.csv', 'CSV', 'c1 UInt8, c2 UInt8');"
sleep 2
${CLICKHOUSE_CLIENT} --query "select * except (time) from file('errors_server', 'CSV', 'time DateTime, database Nullable(String), table Nullable(String), offset UInt32, reason String, raw_data String');"
# Client side
${CLICKHOUSE_CLIENT} --input_format_allow_errors_num 4 --input_format_record_errors_file_path "${CLICKHOUSE_USER_FILES_PATH}/errors_client" --query "insert into data(A, B) format CSV" < ${CLICKHOUSE_USER_FILES_PATH}/a.csv
sleep 2
${CLICKHOUSE_CLIENT} --query "select * except (time) from file('errors_client', 'CSV', 'time DateTime, database Nullable(String), table Nullable(String), offset UInt32, reason String, raw_data String');"
# Restore
${CLICKHOUSE_CLIENT} --query "drop table if exists data;"
rm ${CLICKHOUSE_USER_FILES_PATH}/a.csv
rm ${CLICKHOUSE_USER_FILES_PATH}/errors_server
rm ${CLICKHOUSE_USER_FILES_PATH}/errors_client

View File

@ -0,0 +1,4 @@
{"query":"show processlist format Null\n "}
{"query":"show databases format Null\n "}
{"query":"insert into opentelemetry_test values","read_rows":"3","written_rows":"3"}
{"query":"select * from opentelemetry_test format Null\n ","read_rows":"3","written_rows":""}

View File

@ -0,0 +1,82 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
# This function takes 2 arguments:
# $1 - query id
# $2 - query
function execute_query()
{
${CLICKHOUSE_CLIENT} --opentelemetry_start_trace_probability=1 --query_id $1 -nq "
${2}
"
}
# For some queries, it's not possible to know how many bytes/rows are read when tests are executed on CI,
# so we only to check the db.statement only
function check_query_span_query_only()
{
${CLICKHOUSE_CLIENT} -nq "
SYSTEM FLUSH LOGS;
SELECT attribute['db.statement'] as query
FROM system.opentelemetry_span_log
WHERE finish_date >= yesterday()
AND operation_name = 'query'
AND attribute['clickhouse.query_id'] = '${1}'
Format JSONEachRow
;"
}
function check_query_span()
{
${CLICKHOUSE_CLIENT} -nq "
SYSTEM FLUSH LOGS;
SELECT attribute['db.statement'] as query,
attribute['clickhouse.read_rows'] as read_rows,
attribute['clickhouse.written_rows'] as written_rows
FROM system.opentelemetry_span_log
WHERE finish_date >= yesterday()
AND operation_name = 'query'
AND attribute['clickhouse.query_id'] = '${1}'
Format JSONEachRow
;"
}
#
# Set up
#
${CLICKHOUSE_CLIENT} -nq "
DROP TABLE IF EXISTS ${CLICKHOUSE_DATABASE}.opentelemetry_test;
CREATE TABLE ${CLICKHOUSE_DATABASE}.opentelemetry_test (id UInt64) Engine=MergeTree Order By id;
"
# test 1, a query that has special path in the code
# Format Null is used to make sure no output is generated so that it won't pollute the reference file
query_id=$(${CLICKHOUSE_CLIENT} -q "select generateUUIDv4()");
execute_query $query_id 'show processlist format Null'
check_query_span_query_only "$query_id"
# test 2, a normal show command
query_id=$(${CLICKHOUSE_CLIENT} -q "select generateUUIDv4()");
execute_query $query_id 'show databases format Null'
check_query_span_query_only "$query_id"
# test 3, a normal insert query on local table
query_id=$(${CLICKHOUSE_CLIENT} -q "select generateUUIDv4()");
execute_query $query_id 'insert into opentelemetry_test values(1)(2)(3)'
check_query_span "$query_id"
# test 4, a normal select query
query_id=$(${CLICKHOUSE_CLIENT} -q "select generateUUIDv4()");
execute_query $query_id 'select * from opentelemetry_test format Null'
check_query_span $query_id
#
# Tear down
#
${CLICKHOUSE_CLIENT} -q "
DROP TABLE IF EXISTS ${CLICKHOUSE_DATABASE}.opentelemetry_test;
"

View File

@ -0,0 +1,4 @@
-- Tags: no-parallel, no-fasttest
insert into function file(02422_data.msgpack) select toUUID('f4cdd80d-5d15-4bdc-9527-adcca635ec1f') as uuid settings output_format_msgpack_uuid_representation='ext';
select * from file(02422_data.msgpack, auto, 'x Int32'); -- {serverError ILLEGAL_COLUMN}

View File

@ -13,6 +13,8 @@ python3 changelog.py -h
Usage example:
```
git fetch --tags # changelog.py depends on having the tags available, this will fetch them
python3 changelog.py --output=changelog-v22.4.1.2305-prestable.md --gh-user-or-token="$GITHUB_TOKEN" v21.6.2.7-prestable
python3 changelog.py --output=changelog-v22.4.1.2305-prestable.md --gh-user-or-token="$USER" --gh-password="$PASSWORD" v21.6.2.7-prestable
```

View File

@ -0,0 +1,10 @@
#!/bin/bash
set -e
# We check only our code, that's why we skip contrib
GIT_ROOT=$(git rev-parse --show-cdup)
GIT_ROOT=${GIT_ROOT:-.}
VERSION=$(sed -e '1 s/^v//; 1 s/-.*//p; d' "$GIT_ROOT"/utils/list-versions/version_date.tsv)
find "$GIT_ROOT/docker/server/" -name 'Dockerfile.*' -print0 | xargs -0 sed -i "/^ARG VERSION=/ s/^.*$/ARG VERSION=\"$VERSION\"/"