Merge branch 'master' into revert-40968-s3-sharding-2

This commit is contained in:
alesapin 2022-09-14 11:53:14 +02:00 committed by GitHub
commit 51a302a70f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
108 changed files with 2364 additions and 297 deletions

15
.git-blame-ignore-revs Normal file
View File

@ -0,0 +1,15 @@
# This is a file that can be used by git-blame to ignore some revisions.
# (git 2.23+, released in August 2019)
#
# Can be configured as follow:
#
# $ git config blame.ignoreRevsFile .git-blame-ignore-revs
#
# For more information you can look at git-blame(1) man page.
# Changed tabs to spaces in code [#CLICKHOUSE-3]
137ad95929ee016cc6d3c03bccb5586941c163ff
# dbms/ → src/
# (though it is unlikely that you will see it in blame)
06446b4f08a142d6f1bc30664c47ded88ab51782

View File

@ -220,6 +220,35 @@ ReplxxLineReader::ReplxxLineReader(
rx.bind_key(Replxx::KEY::control('W'), [this](char32_t code) { return rx.invoke(Replxx::ACTION::KILL_TO_WHITESPACE_ON_LEFT, code); });
rx.bind_key(Replxx::KEY::meta('E'), [this](char32_t) { openEditor(); return Replxx::ACTION_RESULT::CONTINUE; });
/// readline insert-comment
auto insert_comment_action = [this](char32_t code)
{
replxx::Replxx::State state(rx.get_state());
const char * line = state.text();
const char * line_end = line + strlen(line);
std::string commented_line;
if (std::find(line, line_end, '\n') != line_end)
{
/// If query has multiple lines, multiline comment is used over
/// commenting each line separately for easier uncomment (though
/// with invoking editor it is simpler to uncomment multiple lines)
///
/// Note, that using multiline comment is OK even with nested
/// comments, since nested comments are supported.
commented_line = fmt::format("/* {} */", state.text());
}
else
{
// In a simplest case use simple comment.
commented_line = fmt::format("-- {}", state.text());
}
rx.set_state(replxx::Replxx::State(commented_line.c_str(), commented_line.size()));
return rx.invoke(Replxx::ACTION::COMMIT_LINE, code);
};
rx.bind_key(Replxx::KEY::meta('#'), insert_comment_action);
}
ReplxxLineReader::~ReplxxLineReader()

View File

@ -370,6 +370,7 @@ else
# Avoid "Setting s3_check_objects_after_upload is neither a builtin setting..."
rm -f /etc/clickhouse-server/users.d/enable_blobs_check.xml ||:
rm -f /etc/clickhouse-server/users.d/marks.xml ||:
# Remove s3 related configs to avoid "there is no disk type `cache`"
rm -f /etc/clickhouse-server/config.d/storage_conf.xml ||:

View File

@ -0,0 +1,34 @@
---
sidebar_position: 1
sidebar_label: 2022
---
# 2022 Changelog
### ClickHouse release v22.8.5.29-lts (74ffb843807) FIXME as compared to v22.8.4.7-lts (baad27bcd2f)
#### New Feature
* Backported in [#40870](https://github.com/ClickHouse/ClickHouse/issues/40870): Add setting to disable limit on kafka_num_consumers. Closes [#40331](https://github.com/ClickHouse/ClickHouse/issues/40331). [#40670](https://github.com/ClickHouse/ClickHouse/pull/40670) ([Kruglov Pavel](https://github.com/Avogar)).
#### Improvement
* Backported in [#40817](https://github.com/ClickHouse/ClickHouse/issues/40817): The setting `show_addresses_in_stack_traces` was accidentally disabled in default `config.xml`. It's removed from the config now, so the setting is enabled by default. [#40749](https://github.com/ClickHouse/ClickHouse/pull/40749) ([Alexander Tokmakov](https://github.com/tavplubix)).
* Backported in [#40944](https://github.com/ClickHouse/ClickHouse/issues/40944): Fix issue with passing MySQL timeouts for MySQL database engine and MySQL table function. Closes [#34168](https://github.com/ClickHouse/ClickHouse/issues/34168)?notification_referrer_id=NT_kwDOAzsV57MzMDMxNjAzNTY5OjU0MjAzODc5. [#40751](https://github.com/ClickHouse/ClickHouse/pull/40751) ([Kseniia Sumarokova](https://github.com/kssenii)).
#### Build/Testing/Packaging Improvement
* Backported in [#41157](https://github.com/ClickHouse/ClickHouse/issues/41157): Add macOS binaries to GH release assets, it fixes [#37718](https://github.com/ClickHouse/ClickHouse/issues/37718). [#41088](https://github.com/ClickHouse/ClickHouse/pull/41088) ([Mikhail f. Shiryaev](https://github.com/Felixoid)).
#### Bug Fix (user-visible misbehavior in official stable or prestable release)
* Backported in [#40866](https://github.com/ClickHouse/ClickHouse/issues/40866): - Fix crash while parsing values of type `Object` that contains arrays of variadic dimension. [#40483](https://github.com/ClickHouse/ClickHouse/pull/40483) ([Duc Canh Le](https://github.com/canhld94)).
* Backported in [#40805](https://github.com/ClickHouse/ClickHouse/issues/40805): During insertion of a new query to the `ProcessList` allocations happen. If we reach the memory limit during these allocations we can not use `OvercommitTracker`, because `ProcessList::mutex` is already acquired. Fixes [#40611](https://github.com/ClickHouse/ClickHouse/issues/40611). [#40677](https://github.com/ClickHouse/ClickHouse/pull/40677) ([Dmitry Novik](https://github.com/novikd)).
* Backported in [#40777](https://github.com/ClickHouse/ClickHouse/issues/40777): Fix memory leak while pushing to MVs w/o query context (from Kafka/...). [#40732](https://github.com/ClickHouse/ClickHouse/pull/40732) ([Azat Khuzhin](https://github.com/azat)).
* Backported in [#41135](https://github.com/ClickHouse/ClickHouse/issues/41135): Fix access rights for `DESCRIBE TABLE url()` and some other `DESCRIBE TABLE <table_function>()`. [#40975](https://github.com/ClickHouse/ClickHouse/pull/40975) ([Vitaly Baranov](https://github.com/vitlibar)).
* Backported in [#41242](https://github.com/ClickHouse/ClickHouse/issues/41242): Fixed "possible deadlock avoided" error on automatic conversion of database engine from Ordinary to Atomic. [#41146](https://github.com/ClickHouse/ClickHouse/pull/41146) ([Alexander Tokmakov](https://github.com/tavplubix)).
* Backported in [#41234](https://github.com/ClickHouse/ClickHouse/issues/41234): Fix background clean up of broken detached parts. [#41190](https://github.com/ClickHouse/ClickHouse/pull/41190) ([Kseniia Sumarokova](https://github.com/kssenii)).
#### NOT FOR CHANGELOG / INSIGNIFICANT
* use ROBOT_CLICKHOUSE_COMMIT_TOKEN for create-pull-request [#40067](https://github.com/ClickHouse/ClickHouse/pull/40067) ([Yakov Olkhovskiy](https://github.com/yakov-olkhovskiy)).
* use input token instead of env var [#40421](https://github.com/ClickHouse/ClickHouse/pull/40421) ([Yakov Olkhovskiy](https://github.com/yakov-olkhovskiy)).
* CaresPTRResolver small safety improvement [#40890](https://github.com/ClickHouse/ClickHouse/pull/40890) ([Arthur Passos](https://github.com/arthurpassos)).

View File

@ -103,6 +103,7 @@ ClickHouse, Inc. does **not** maintain the tools and libraries listed below and
- [ClickHouse.Client](https://github.com/DarkWanderer/ClickHouse.Client)
- [ClickHouse.Net](https://github.com/ilyabreev/ClickHouse.Net)
- [ClickHouse.Net.Migrations](https://github.com/ilyabreev/ClickHouse.Net.Migrations)
- [Linq To DB](https://github.com/linq2db/linq2db)
- Elixir
- [Ecto](https://github.com/elixir-ecto/ecto)
- [clickhouse_ecto](https://github.com/appodeal/clickhouse_ecto)

View File

@ -32,6 +32,12 @@ SET allow_experimental_lightweight_delete = true;
An [alternative way to delete rows](./alter/delete.md) in ClickHouse is `ALTER TABLE ... DELETE`, which might be more efficient if you do bulk deletes only occasionally and don't need the operation to be applied instantly. In most use cases the new lightweight `DELETE FROM` behavior will be considerably faster.
:::warning
Even though deletes are becoming more lightweight in ClickHouse, they should still not be used as aggressively as on OLTP system. Ligthweight deletes are currently efficient for wide parts, but for compact parts they can be a heavyweight operation, and it may be better to use `ALTER TABLE` for some scenarios.
Even though deletes are becoming more lightweight in ClickHouse, they should still not be used as aggressively as on an OLTP system. Ligthweight deletes are currently efficient for wide parts, but for compact parts they can be a heavyweight operation, and it may be better to use `ALTER TABLE` for some scenarios.
:::
:::note
`DELETE FROM` requires the `ALTER DELETE` privilege:
```sql
grant ALTER DELETE ON db.table to username;
```
:::

View File

@ -227,6 +227,8 @@ void LocalServer::cleanup()
global_context.reset();
}
/// thread status should be destructed before shared context because it relies on process list.
status.reset();
// Delete the temporary directory if needed.
@ -366,7 +368,7 @@ int LocalServer::main(const std::vector<std::string> & /*args*/)
try
{
UseSSL use_ssl;
ThreadStatus thread_status;
thread_status.emplace();
StackTrace::setShowAddresses(config().getBool("show_addresses_in_stack_traces", true));

View File

@ -551,8 +551,9 @@ static void sanityChecks(Server & server)
try
{
const char * filename = "/sys/devices/system/clocksource/clocksource0/current_clocksource";
if (readString(filename).find("tsc") == std::string::npos)
server.context()->addWarningMessage("Linux is not using a fast TSC clock source. Performance can be degraded. Check " + String(filename));
String clocksource = readString(filename);
if (clocksource.find("tsc") == std::string::npos && clocksource.find("kvm-clock") == std::string::npos)
server.context()->addWarningMessage("Linux is not using a fast clock source. Performance can be degraded. Check " + String(filename));
}
catch (...)
{

View File

@ -176,9 +176,6 @@ protected:
bool stderr_is_a_tty = false; /// stderr is a terminal.
uint64_t terminal_width = 0;
ServerConnectionPtr connection;
ConnectionParameters connection_parameters;
String format; /// Query results output format.
bool select_into_file = false; /// If writing result INTO OUTFILE. It affects progress rendering.
bool select_into_file_and_stdout = false; /// If writing result INTO OUTFILE AND STDOUT. It affects progress rendering.
@ -199,6 +196,11 @@ protected:
SharedContextHolder shared_context;
ContextMutablePtr global_context;
std::optional<ThreadStatus> thread_status;
ServerConnectionPtr connection;
ConnectionParameters connection_parameters;
/// Buffer that reads from stdin in batch mode.
ReadBufferFromFileDescriptor std_in{STDIN_FILENO};
/// Console output.

View File

@ -16,7 +16,6 @@ namespace ErrorCodes
extern const int ATTEMPT_TO_READ_AFTER_EOF;
extern const int NETWORK_ERROR;
extern const int SOCKET_TIMEOUT;
extern const int DNS_ERROR;
}
ConnectionEstablisher::ConnectionEstablisher(
@ -91,7 +90,6 @@ void ConnectionEstablisher::run(ConnectionEstablisher::TryResult & result, std::
catch (const Exception & e)
{
if (e.code() != ErrorCodes::NETWORK_ERROR && e.code() != ErrorCodes::SOCKET_TIMEOUT
&& e.code() != ErrorCodes::DNS_ERROR
&& e.code() != ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF)
throw;

View File

@ -31,9 +31,6 @@ LocalConnection::LocalConnection(ContextPtr context_, bool send_progress_, bool
/// Authenticate and create a context to execute queries.
session.authenticate("default", "", Poco::Net::SocketAddress{});
session.makeSessionContext();
if (!CurrentThread::isInitialized())
thread_status.emplace();
}
LocalConnection::~LocalConnection()

View File

@ -156,7 +156,6 @@ private:
String description = "clickhouse-local";
std::optional<LocalQueryState> state;
std::optional<ThreadStatus> thread_status;
/// Last "server" packet.
std::optional<UInt64> next_packet_type;

33
src/Common/Base64.cpp Normal file
View File

@ -0,0 +1,33 @@
#include <Common/Base64.h>
#include <Poco/Base64Decoder.h>
#include <Poco/Base64Encoder.h>
#include <Poco/MemoryStream.h>
#include <Poco/StreamCopier.h>
#include <sstream>
namespace DB
{
std::string base64Encode(const std::string & decoded, bool url_encoding)
{
std::ostringstream ostr; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
ostr.exceptions(std::ios::failbit);
Poco::Base64Encoder encoder(ostr, url_encoding ? Poco::BASE64_URL_ENCODING : 0);
encoder.rdbuf()->setLineLength(0);
encoder << decoded;
encoder.close();
return ostr.str();
}
std::string base64Decode(const std::string & encoded, bool url_encoding)
{
std::string decoded;
Poco::MemoryInputStream istr(encoded.data(), encoded.size());
Poco::Base64Decoder decoder(istr, url_encoding ? Poco::BASE64_URL_ENCODING : 0);
Poco::StreamCopier::copyToString(decoder, decoded);
return decoded;
}
}

12
src/Common/Base64.h Normal file
View File

@ -0,0 +1,12 @@
#pragma once
#include <string>
namespace DB
{
std::string base64Encode(const std::string & decoded, bool url_encoding = false);
std::string base64Decode(const std::string & encoded, bool url_encoding = false);
}

View File

@ -636,6 +636,7 @@
M(665, CANNOT_CONNECT_NATS) \
M(666, CANNOT_USE_CACHE) \
M(667, NOT_INITIALIZED) \
M(668, INVALID_STATE) \
\
M(999, KEEPER_EXCEPTION) \
M(1000, POCO_EXCEPTION) \

View File

@ -146,6 +146,9 @@
M(SelectedRows, "Number of rows SELECTed from all tables.") \
M(SelectedBytes, "Number of bytes (uncompressed; for columns as they stored in memory) SELECTed from all tables.") \
\
M(WaitMarksLoadMicroseconds, "Time spent loading marks") \
M(BackgroundLoadingMarksTasks, "Number of background tasks for loading marks") \
\
M(Merge, "Number of launched background merges.") \
M(MergedRows, "Rows read for background merges. This is the number of rows before merge.") \
M(MergedUncompressedBytes, "Uncompressed bytes (for columns as they stored in memory) that was read for background merges. This is the number before merge.") \

View File

@ -264,6 +264,18 @@ protected:
}
};
/// Schedule jobs/tasks on global thread pool without implicit passing tracing context on current thread to underlying worker as parent tracing context.
///
/// If you implement your own job/task scheduling upon global thread pool or schedules a long time running job in a infinite loop way,
/// you need to use class, or you need to use ThreadFromGlobalPool below.
///
/// See the comments of ThreadPool below to know how it works.
using ThreadFromGlobalPoolNoTracingContextPropagation = ThreadFromGlobalPoolImpl<false>;
/// An alias of thread that execute jobs/tasks on global thread pool by implicit passing tracing context on current thread to underlying worker as parent tracing context.
/// If jobs/tasks are directly scheduled by using APIs of this class, you need to use this class or you need to use class above.
using ThreadFromGlobalPool = ThreadFromGlobalPoolImpl<true>;
/// Recommended thread pool for the case when multiple thread pools are created and destroyed.
///
/// The template parameter of ThreadFromGlobalPool is set to false to disable tracing context propagation to underlying worker.
@ -274,9 +286,6 @@ protected:
/// which means the tracing context initialized at underlying worker level won't be delete for a very long time.
/// This would cause wrong context for further jobs scheduled in ThreadPool.
///
/// To make sure the tracing context are correctly propagated, we explicitly disable context propagation(including initialization and de-initialization) at underlying worker level.
/// To make sure the tracing context is correctly propagated, we explicitly disable context propagation(including initialization and de-initialization) at underlying worker level.
///
using ThreadPool = ThreadPoolImpl<ThreadFromGlobalPoolImpl<false>>;
/// An alias for user code to execute a job in the global thread pool
using ThreadFromGlobalPool = ThreadFromGlobalPoolImpl<true>;
using ThreadPool = ThreadPoolImpl<ThreadFromGlobalPoolNoTracingContextPropagation>;

View File

@ -605,7 +605,7 @@ void ZooKeeper::removeChildren(const std::string & path)
}
void ZooKeeper::removeChildrenRecursive(const std::string & path, const String & keep_child_node)
void ZooKeeper::removeChildrenRecursive(const std::string & path, RemoveException keep_child)
{
Strings children = getChildren(path);
while (!children.empty())
@ -613,16 +613,23 @@ void ZooKeeper::removeChildrenRecursive(const std::string & path, const String &
Coordination::Requests ops;
for (size_t i = 0; i < MULTI_BATCH_SIZE && !children.empty(); ++i)
{
removeChildrenRecursive(fs::path(path) / children.back());
if (likely(keep_child_node.empty() || keep_child_node != children.back()))
if (keep_child.path.empty() || keep_child.path != children.back()) [[likely]]
{
removeChildrenRecursive(fs::path(path) / children.back());
ops.emplace_back(makeRemoveRequest(fs::path(path) / children.back(), -1));
}
else if (keep_child.remove_subtree)
{
removeChildrenRecursive(fs::path(path) / children.back());
}
children.pop_back();
}
multi(ops);
}
}
bool ZooKeeper::tryRemoveChildrenRecursive(const std::string & path, bool probably_flat, const String & keep_child_node)
bool ZooKeeper::tryRemoveChildrenRecursive(const std::string & path, bool probably_flat, RemoveException keep_child)
{
Strings children;
if (tryGetChildren(path, children) != Coordination::Error::ZOK)
@ -639,16 +646,20 @@ bool ZooKeeper::tryRemoveChildrenRecursive(const std::string & path, bool probab
{
String child_path = fs::path(path) / children.back();
/// Will try to avoid recursive getChildren calls if child_path probably has no children.
/// It may be extremely slow when path contain a lot of leaf children.
if (!probably_flat)
tryRemoveChildrenRecursive(child_path);
if (likely(keep_child_node.empty() || keep_child_node != children.back()))
if (keep_child.path.empty() || keep_child.path != children.back()) [[likely]]
{
/// Will try to avoid recursive getChildren calls if child_path probably has no children.
/// It may be extremely slow when path contain a lot of leaf children.
if (!probably_flat)
tryRemoveChildrenRecursive(child_path);
batch.push_back(child_path);
ops.emplace_back(zkutil::makeRemoveRequest(child_path, -1));
}
else if (keep_child.remove_subtree && !probably_flat)
{
tryRemoveChildrenRecursive(child_path);
}
children.pop_back();
}

View File

@ -58,6 +58,18 @@ struct ShuffleHost
}
};
struct RemoveException
{
explicit RemoveException(std::string_view path_ = "", bool remove_subtree_ = true)
: path(path_)
, remove_subtree(remove_subtree_)
{}
std::string_view path;
// whether we should keep the child node and its subtree or just the child node
bool remove_subtree;
};
using GetPriorityForLoadBalancing = DB::GetPriorityForLoadBalancing;
/// ZooKeeper session. The interface is substantially different from the usual libzookeeper API.
@ -219,13 +231,13 @@ public:
void tryRemoveRecursive(const std::string & path);
/// Similar to removeRecursive(...) and tryRemoveRecursive(...), but does not remove path itself.
/// If keep_child_node is not empty, this method will not remove path/keep_child_node (but will remove its subtree).
/// It can be useful to keep some child node as a flag which indicates that path is currently removing.
void removeChildrenRecursive(const std::string & path, const String & keep_child_node = {});
/// Node defined as RemoveException will not be deleted.
void removeChildrenRecursive(const std::string & path, RemoveException keep_child = RemoveException{});
/// If probably_flat is true, this method will optimistically try to remove children non-recursive
/// and will fall back to recursive removal if it gets ZNOTEMPTY for some child.
/// Returns true if no kind of fallback happened.
bool tryRemoveChildrenRecursive(const std::string & path, bool probably_flat = false, const String & keep_child_node = {});
/// Node defined as RemoveException will not be deleted.
bool tryRemoveChildrenRecursive(const std::string & path, bool probably_flat = false, RemoveException keep_child= RemoveException{});
/// Remove all children nodes (non recursive).
void removeChildren(const std::string & path);

View File

@ -30,6 +30,7 @@ struct Settings;
M(UInt64, snapshot_distance, 100000, "How many log items we have to collect to write new snapshot", 0) \
M(Bool, auto_forwarding, true, "Allow to forward write requests from followers to leader", 0) \
M(Milliseconds, shutdown_timeout, 5000, "How much time we will wait until RAFT shutdown", 0) \
M(Milliseconds, session_shutdown_timeout, 10000, "How much time we will wait until sessions are closed during shutdown", 0) \
M(Milliseconds, startup_timeout, 180000, "How much time we will wait until RAFT to start.", 0) \
M(LogsLevel, raft_logs_level, LogsLevel::information, "Log internal RAFT logs into main server log level. Valid values: 'trace', 'debug', 'information', 'warning', 'error', 'fatal', 'none'", 0) \
M(UInt64, rotate_log_storage_interval, 100000, "How many records will be stored in one log storage file", 0) \

View File

@ -354,9 +354,6 @@ void KeeperDispatcher::shutdown()
update_configuration_thread.join();
}
if (server)
server->shutdown();
KeeperStorage::RequestForSession request_for_session;
/// Set session expired for all pending requests
@ -368,10 +365,58 @@ void KeeperDispatcher::shutdown()
setResponse(request_for_session.session_id, response);
}
/// Clear all registered sessions
std::lock_guard lock(session_to_response_callback_mutex);
session_to_response_callback.clear();
KeeperStorage::RequestsForSessions close_requests;
{
/// Clear all registered sessions
std::lock_guard lock(session_to_response_callback_mutex);
if (hasLeader())
{
close_requests.reserve(session_to_response_callback.size());
// send to leader CLOSE requests for active sessions
for (const auto & [session, response] : session_to_response_callback)
{
auto request = Coordination::ZooKeeperRequestFactory::instance().get(Coordination::OpNum::Close);
request->xid = Coordination::CLOSE_XID;
using namespace std::chrono;
KeeperStorage::RequestForSession request_info
{
.session_id = session,
.time = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count(),
.request = std::move(request),
};
close_requests.push_back(std::move(request_info));
}
}
session_to_response_callback.clear();
}
// if there is no leader, there is no reason to do CLOSE because it's a write request
if (hasLeader() && !close_requests.empty())
{
LOG_INFO(log, "Trying to close {} session(s)", close_requests.size());
const auto raft_result = server->putRequestBatch(close_requests);
auto sessions_closing_done_promise = std::make_shared<std::promise<void>>();
auto sessions_closing_done = sessions_closing_done_promise->get_future();
raft_result->when_ready([sessions_closing_done_promise = std::move(sessions_closing_done_promise)](
nuraft::cmd_result<nuraft::ptr<nuraft::buffer>> & /*result*/,
nuraft::ptr<std::exception> & /*exception*/) { sessions_closing_done_promise->set_value(); });
auto session_shutdown_timeout = configuration_and_settings->coordination_settings->session_shutdown_timeout.totalMilliseconds();
if (sessions_closing_done.wait_for(std::chrono::milliseconds(session_shutdown_timeout)) != std::future_status::ready)
LOG_WARNING(
log,
"Failed to close sessions in {}ms. If they are not closed, they will be closed after session timeout.",
session_shutdown_timeout);
}
if (server)
server->shutdown();
CurrentMetrics::set(CurrentMetrics::KeeperAliveConnections, 0);
}
catch (...)
{
@ -418,13 +463,15 @@ void KeeperDispatcher::sessionCleanerTask()
LOG_INFO(log, "Found dead session {}, will try to close it", dead_session);
/// Close session == send close request to raft server
Coordination::ZooKeeperRequestPtr request = Coordination::ZooKeeperRequestFactory::instance().get(Coordination::OpNum::Close);
auto request = Coordination::ZooKeeperRequestFactory::instance().get(Coordination::OpNum::Close);
request->xid = Coordination::CLOSE_XID;
KeeperStorage::RequestForSession request_info;
request_info.request = request;
using namespace std::chrono;
request_info.time = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
request_info.session_id = dead_session;
KeeperStorage::RequestForSession request_info
{
.session_id = dead_session,
.time = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count(),
.request = std::move(request),
};
{
std::lock_guard lock(push_request_mutex);
if (!requests_queue->push(std::move(request_info)))

View File

@ -1,11 +1,11 @@
#include <iterator>
#include <variant>
#include <Coordination/KeeperStorage.h>
#include <IO/Operators.h>
#include <IO/WriteHelpers.h>
#include <boost/algorithm/string.hpp>
#include <Poco/Base64Encoder.h>
#include <Poco/SHA1Engine.h>
#include <Common/Base64.h>
#include <Common/ZooKeeper/ZooKeeperCommon.h>
#include <Common/SipHash.h>
#include <Common/ZooKeeper/ZooKeeperConstants.h>
@ -15,8 +15,11 @@
#include <Common/logger_useful.h>
#include <Common/setThreadName.h>
#include <Common/LockMemoryExceptionInThread.h>
#include <Coordination/pathUtils.h>
#include <Coordination/KeeperConstants.h>
#include <Coordination/KeeperStorage.h>
#include <sstream>
#include <iomanip>
#include <mutex>
@ -36,17 +39,6 @@ namespace ErrorCodes
namespace
{
String base64Encode(const String & decoded)
{
std::ostringstream ostr; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
ostr.exceptions(std::ios::failbit);
Poco::Base64Encoder encoder(ostr);
encoder.rdbuf()->setLineLength(0);
encoder << decoded;
encoder.close();
return ostr.str();
}
String getSHA1(const String & userdata)
{
Poco::SHA1Engine engine;

View File

@ -149,9 +149,9 @@ BackgroundSchedulePool::BackgroundSchedulePool(size_t size_, CurrentMetrics::Met
threads.resize(size_);
for (auto & thread : threads)
thread = ThreadFromGlobalPool([this] { threadFunction(); });
thread = ThreadFromGlobalPoolNoTracingContextPropagation([this] { threadFunction(); });
delayed_thread = ThreadFromGlobalPool([this] { delayExecutionThreadFunction(); });
delayed_thread = ThreadFromGlobalPoolNoTracingContextPropagation([this] { delayExecutionThreadFunction(); });
}
@ -168,7 +168,7 @@ void BackgroundSchedulePool::increaseThreadsCount(size_t new_threads_count)
threads.resize(new_threads_count);
for (size_t i = old_threads_count; i < new_threads_count; ++i)
threads[i] = ThreadFromGlobalPool([this] { threadFunction(); });
threads[i] = ThreadFromGlobalPoolNoTracingContextPropagation([this] { threadFunction(); });
}

View File

@ -57,7 +57,9 @@ public:
~BackgroundSchedulePool();
private:
using Threads = std::vector<ThreadFromGlobalPool>;
/// BackgroundSchedulePool schedules a task on its own task queue, there's no need to construct/restore tracing context on this level.
/// This is also how ThreadPool class treats the tracing context. See ThreadPool for more information.
using Threads = std::vector<ThreadFromGlobalPoolNoTracingContextPropagation>;
void threadFunction();
void delayExecutionThreadFunction();
@ -83,7 +85,7 @@ private:
std::condition_variable delayed_tasks_cond_var;
std::mutex delayed_tasks_mutex;
/// Thread waiting for next delayed task.
ThreadFromGlobalPool delayed_thread;
ThreadFromGlobalPoolNoTracingContextPropagation delayed_thread;
/// Tasks ordered by scheduled time.
DelayedTasks delayed_tasks;

View File

@ -602,6 +602,8 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(Bool, skip_download_if_exceeds_query_cache, true, "Skip download from remote filesystem if exceeds query cache size", 0) \
M(UInt64, max_query_cache_size, (128UL * 1024 * 1024 * 1024), "Max remote filesystem cache size that can be used by a single query", 0) \
\
M(Bool, load_marks_asynchronously, false, "Load MergeTree marks asynchronously", 0) \
\
M(Bool, use_structure_from_insertion_table_in_table_functions, false, "Use structure from insertion table instead of schema inference from data", 0) \
\
M(UInt64, http_max_tries, 10, "Max attempts to read via http.", 0) \
@ -616,6 +618,8 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(Bool, allow_deprecated_database_ordinary, false, "Allow to create databases with deprecated Ordinary engine", 0) \
M(Bool, allow_deprecated_syntax_for_merge_tree, false, "Allow to create *MergeTree tables with deprecated engine definition syntax", 0) \
\
M(Bool, force_grouping_standard_compatibility, true, "Make GROUPING function to return 1 when argument is not used as an aggregation key", 0) \
\
M(Bool, schema_inference_use_cache_for_file, true, "Use cache in schema inference while using file table function", 0) \
M(Bool, schema_inference_use_cache_for_s3, true, "Use cache in schema inference while using s3 table function", 0) \
M(Bool, schema_inference_use_cache_for_hdfs, true, "Use cache in schema inference while using hdfs table function", 0) \

View File

@ -78,6 +78,7 @@ namespace SettingsChangesHistory
/// It's used to implement `compatibility` setting (see https://github.com/ClickHouse/ClickHouse/issues/35972)
static std::map<ClickHouseVersion, SettingsChangesHistory::SettingsChanges> settings_changes_history =
{
{"22.9", {{"force_grouping_standard_compatibility", false, true, "Make GROUPING function output the same as in SQL standard and other DBMS"}}},
{"22.7", {{"cross_to_inner_join_rewrite", 1, 2, "Force rewrite comma join to inner"},
{"enable_positional_arguments", false, true, "Enable positional arguments feature by default"},
{"format_csv_allow_single_quotes", true, false, "Most tools don't treat single quote in CSV specially, don't do it by default too"}}},

View File

@ -1,9 +1,9 @@
#pragma once
#include <base/defines.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnConst.h>
#include <Columns/ColumnFixedString.h>
#include <Core/ColumnNumbers.h>
#include <DataTypes/DataTypesNumber.h>
#include <Functions/FunctionFactory.h>
@ -19,10 +19,17 @@ protected:
static constexpr UInt64 ONE = 1;
const ColumnNumbers arguments_indexes;
// Initial implementation of GROUPING function returned 1 if the argument is used as an aggregation key.
// This differs from the behavior described in the standard and other DBMS.
const bool force_compatibility;
static constexpr UInt64 COMPATIBLE_MODE[] = {1, 0};
static constexpr UInt64 INCOMPATIBLE_MODE[] = {0, 1};
public:
FunctionGroupingBase(ColumnNumbers arguments_indexes_)
FunctionGroupingBase(ColumnNumbers arguments_indexes_, bool force_compatibility_)
: arguments_indexes(std::move(arguments_indexes_))
, force_compatibility(force_compatibility_)
{}
bool isVariadic() const override { return true; }
@ -48,13 +55,15 @@ public:
auto result = ColumnUInt64::create();
auto & result_data = result->getData();
result_data.reserve(input_rows_count);
const auto * result_table = likely(force_compatibility) ? COMPATIBLE_MODE : INCOMPATIBLE_MODE;
for (size_t i = 0; i < input_rows_count; ++i)
{
UInt64 set_index = grouping_set_column->getElement(i);
UInt64 value = 0;
for (auto index : arguments_indexes)
value = (value << 1) + (checker(set_index, index) ? 1 : 0);
value = (value << 1) + result_table[checker(set_index, index) ? 1 : 0];
result_data.push_back(value);
}
@ -65,14 +74,16 @@ public:
class FunctionGroupingOrdinary : public FunctionGroupingBase
{
public:
explicit FunctionGroupingOrdinary(ColumnNumbers arguments_indexes_)
: FunctionGroupingBase(std::move(arguments_indexes_))
FunctionGroupingOrdinary(ColumnNumbers arguments_indexes_, bool force_compatibility_)
: FunctionGroupingBase(std::move(arguments_indexes_), force_compatibility_)
{}
String getName() const override { return "groupingOrdinary"; }
ColumnPtr executeImpl(const ColumnsWithTypeAndName &, const DataTypePtr &, size_t input_rows_count) const override
{
if (likely(force_compatibility))
return ColumnUInt64::create(input_rows_count, 0);
UInt64 value = (ONE << arguments_indexes.size()) - 1;
return ColumnUInt64::create(input_rows_count, value);
}
@ -83,8 +94,8 @@ class FunctionGroupingForRollup : public FunctionGroupingBase
const UInt64 aggregation_keys_number;
public:
FunctionGroupingForRollup(ColumnNumbers arguments_indexes_, UInt64 aggregation_keys_number_)
: FunctionGroupingBase(std::move(arguments_indexes_))
FunctionGroupingForRollup(ColumnNumbers arguments_indexes_, UInt64 aggregation_keys_number_, bool force_compatibility_)
: FunctionGroupingBase(std::move(arguments_indexes_), force_compatibility_)
, aggregation_keys_number(aggregation_keys_number_)
{}
@ -113,8 +124,8 @@ class FunctionGroupingForCube : public FunctionGroupingBase
public:
FunctionGroupingForCube(ColumnNumbers arguments_indexes_, UInt64 aggregation_keys_number_)
: FunctionGroupingBase(arguments_indexes_)
FunctionGroupingForCube(ColumnNumbers arguments_indexes_, UInt64 aggregation_keys_number_, bool force_compatibility_)
: FunctionGroupingBase(arguments_indexes_, force_compatibility_)
, aggregation_keys_number(aggregation_keys_number_)
{}
@ -142,8 +153,8 @@ class FunctionGroupingForGroupingSets : public FunctionGroupingBase
{
ColumnNumbersSetList grouping_sets;
public:
FunctionGroupingForGroupingSets(ColumnNumbers arguments_indexes_, ColumnNumbersList const & grouping_sets_)
: FunctionGroupingBase(std::move(arguments_indexes_))
FunctionGroupingForGroupingSets(ColumnNumbers arguments_indexes_, ColumnNumbersList const & grouping_sets_, bool force_compatibility_)
: FunctionGroupingBase(std::move(arguments_indexes_), force_compatibility_)
{
for (auto const & set : grouping_sets_)
grouping_sets.emplace_back(set.begin(), set.end());

View File

@ -76,6 +76,8 @@ struct ReadSettings
/// For 'pread_threadpool' method. Lower is more priority.
size_t priority = 0;
bool load_marks_asynchronously = true;
size_t remote_fs_read_max_backoff_ms = 10000;
size_t remote_fs_read_backoff_max_tries = 4;

View File

@ -880,20 +880,20 @@ void ActionsMatcher::visit(const ASTFunction & node, const ASTPtr & ast, Data &
{
case GroupByKind::GROUPING_SETS:
{
data.addFunction(std::make_shared<FunctionToOverloadResolverAdaptor>(std::make_shared<FunctionGroupingForGroupingSets>(std::move(arguments_indexes), keys_info.grouping_set_keys)), { "__grouping_set" }, column_name);
data.addFunction(std::make_shared<FunctionToOverloadResolverAdaptor>(std::make_shared<FunctionGroupingForGroupingSets>(std::move(arguments_indexes), keys_info.grouping_set_keys, data.getContext()->getSettingsRef().force_grouping_standard_compatibility)), { "__grouping_set" }, column_name);
break;
}
case GroupByKind::ROLLUP:
data.addFunction(std::make_shared<FunctionToOverloadResolverAdaptor>(std::make_shared<FunctionGroupingForRollup>(std::move(arguments_indexes), aggregation_keys_number)), { "__grouping_set" }, column_name);
data.addFunction(std::make_shared<FunctionToOverloadResolverAdaptor>(std::make_shared<FunctionGroupingForRollup>(std::move(arguments_indexes), aggregation_keys_number, data.getContext()->getSettingsRef().force_grouping_standard_compatibility)), { "__grouping_set" }, column_name);
break;
case GroupByKind::CUBE:
{
data.addFunction(std::make_shared<FunctionToOverloadResolverAdaptor>(std::make_shared<FunctionGroupingForCube>(std::move(arguments_indexes), aggregation_keys_number)), { "__grouping_set" }, column_name);
data.addFunction(std::make_shared<FunctionToOverloadResolverAdaptor>(std::make_shared<FunctionGroupingForCube>(std::move(arguments_indexes), aggregation_keys_number, data.getContext()->getSettingsRef().force_grouping_standard_compatibility)), { "__grouping_set" }, column_name);
break;
}
case GroupByKind::ORDINARY:
{
data.addFunction(std::make_shared<FunctionToOverloadResolverAdaptor>(std::make_shared<FunctionGroupingOrdinary>(std::move(arguments_indexes))), {}, column_name);
data.addFunction(std::make_shared<FunctionToOverloadResolverAdaptor>(std::make_shared<FunctionGroupingOrdinary>(std::move(arguments_indexes), data.getContext()->getSettingsRef().force_grouping_standard_compatibility)), {}, column_name);
break;
}
default:

View File

@ -141,7 +141,13 @@ AsynchronousInsertQueue::~AsynchronousInsertQueue()
{
/// TODO: add a setting for graceful shutdown.
shutdown = true;
LOG_TRACE(log, "Shutting down the asynchronous insertion queue");
{
std::lock_guard lock(shutdown_mutex);
shutdown = true;
shutdown_cv.notify_all();
}
assert(dump_by_first_update_thread.joinable());
dump_by_first_update_thread.join();
@ -162,6 +168,8 @@ AsynchronousInsertQueue::~AsynchronousInsertQueue()
ErrorCodes::TIMEOUT_EXCEEDED,
"Wait for async insert timeout exceeded)")));
}
LOG_TRACE(log, "Asynchronous insertion queue finished");
}
void AsynchronousInsertQueue::scheduleDataProcessingJob(const InsertQuery & key, InsertDataPtr data, ContextPtr global_context)
@ -276,10 +284,8 @@ void AsynchronousInsertQueue::busyCheck()
{
auto timeout = busy_timeout;
while (!shutdown)
while (!waitForShutdown(timeout))
{
std::this_thread::sleep_for(timeout);
/// TODO: use priority queue instead of raw unsorted queue.
timeout = busy_timeout;
std::shared_lock read_lock(rwlock);
@ -301,9 +307,8 @@ void AsynchronousInsertQueue::busyCheck()
void AsynchronousInsertQueue::staleCheck()
{
while (!shutdown)
while (!waitForShutdown(stale_timeout))
{
std::this_thread::sleep_for(stale_timeout);
std::shared_lock read_lock(rwlock);
for (auto & [key, elem] : queue)
@ -325,9 +330,8 @@ void AsynchronousInsertQueue::cleanup()
/// because it holds exclusive lock.
auto timeout = busy_timeout * 5;
while (!shutdown)
while (!waitForShutdown(timeout))
{
std::this_thread::sleep_for(timeout);
std::vector<InsertQuery> keys_to_remove;
{
@ -379,6 +383,12 @@ void AsynchronousInsertQueue::cleanup()
}
}
bool AsynchronousInsertQueue::waitForShutdown(const Milliseconds & timeout)
{
std::unique_lock shutdown_lock(shutdown_mutex);
return shutdown_cv.wait_for(shutdown_lock, timeout, [this]() { return shutdown; });
}
// static
void AsynchronousInsertQueue::processData(InsertQuery key, InsertDataPtr data, ContextPtr global_context)
try

View File

@ -115,7 +115,10 @@ private:
const Milliseconds busy_timeout;
const Milliseconds stale_timeout;
std::atomic<bool> shutdown{false};
std::mutex shutdown_mutex;
std::condition_variable shutdown_cv;
bool shutdown{false};
ThreadPool pool; /// dump the data only inside this pool.
ThreadFromGlobalPool dump_by_first_update_thread; /// uses busy_timeout and busyCheck()
ThreadFromGlobalPool dump_by_last_update_thread; /// uses stale_timeout and staleCheck()
@ -136,6 +139,10 @@ private:
template <typename E>
static void finishWithException(const ASTPtr & query, const std::list<InsertData::EntryPtr> & entries, const E & exception);
/// @param timeout - time to wait
/// @return true if shutdown requested
bool waitForShutdown(const Milliseconds & timeout);
public:
auto getQueueLocked() const
{

View File

@ -143,7 +143,7 @@ namespace ErrorCodes
/** Set of known objects (environment), that could be used in query.
* Shared (global) part. Order of members (especially, order of destruction) is very important.
*/
struct ContextSharedPart
struct ContextSharedPart : boost::noncopyable
{
Poco::Logger * log = &Poco::Logger::get("Context");
@ -215,6 +215,7 @@ struct ContextSharedPart
std::unique_ptr<AccessControl> access_control;
mutable UncompressedCachePtr uncompressed_cache; /// The cache of decompressed blocks.
mutable MarkCachePtr mark_cache; /// Cache of marks in compressed files.
mutable std::unique_ptr<ThreadPool> load_marks_threadpool; /// Threadpool for loading marks cache.
mutable UncompressedCachePtr index_uncompressed_cache; /// The cache of decompressed blocks for MergeTree indices.
mutable MarkCachePtr index_mark_cache; /// Cache of marks in compressed files of MergeTree indices.
mutable MMappedFileCachePtr mmap_cache; /// Cache of mmapped files to avoid frequent open/map/unmap/close and to reuse from several threads.
@ -313,11 +314,19 @@ struct ContextSharedPart
~ContextSharedPart()
{
/// Wait for thread pool for background writes,
/// since it may use per-user MemoryTracker which will be destroyed here.
try
{
/// Wait for thread pool for background writes,
/// since it may use per-user MemoryTracker which will be destroyed here.
IObjectStorage::getThreadPoolWriter().wait();
/// Make sure that threadpool is destructed before this->process_list
/// because thread_status, which was created for threads inside threadpool,
/// relies on it.
if (load_marks_threadpool)
{
load_marks_threadpool->wait();
load_marks_threadpool.reset();
}
}
catch (...)
{
@ -1688,6 +1697,17 @@ void Context::dropMarkCache() const
shared->mark_cache->reset();
}
ThreadPool & Context::getLoadMarksThreadpool() const
{
auto lock = getLock();
if (!shared->load_marks_threadpool)
{
constexpr size_t pool_size = 50;
constexpr size_t queue_size = 1000000;
shared->load_marks_threadpool = std::make_unique<ThreadPool>(pool_size, pool_size, queue_size);
}
return *shared->load_marks_threadpool;
}
void Context::setIndexUncompressedCache(size_t max_size_in_bytes)
{
@ -3429,6 +3449,8 @@ ReadSettings Context::getReadSettings() const
res.local_fs_prefetch = settings.local_filesystem_read_prefetch;
res.remote_fs_prefetch = settings.remote_filesystem_read_prefetch;
res.load_marks_asynchronously = settings.load_marks_asynchronously;
res.remote_fs_read_max_backoff_ms = settings.remote_fs_read_max_backoff_ms;
res.remote_fs_read_backoff_max_tries = settings.remote_fs_read_backoff_max_tries;
res.enable_filesystem_cache = settings.enable_filesystem_cache;

View File

@ -806,6 +806,7 @@ public:
void setMarkCache(size_t cache_size_in_bytes, const String & mark_cache_policy);
std::shared_ptr<MarkCache> getMarkCache() const;
void dropMarkCache() const;
ThreadPool & getLoadMarksThreadpool() const;
/// Create a cache of index uncompressed blocks of specified size. This can be done only once.
void setIndexUncompressedCache(size_t max_size_in_bytes);

View File

@ -890,7 +890,7 @@ void DDLWorker::cleanupQueue(Int64, const ZooKeeperPtr & zookeeper)
/// We recursively delete all nodes except node_path/finished to prevent staled hosts from
/// creating node_path/active node (see createStatusDirs(...))
zookeeper->tryRemoveChildrenRecursive(node_path, /* probably_flat */ false, "finished");
zookeeper->tryRemoveChildrenRecursive(node_path, /* probably_flat */ false, zkutil::RemoveException{"finished"});
/// And then we remove node_path and node_path/finished in a single transaction
Coordination::Requests ops;

View File

@ -539,7 +539,7 @@ BlockIO InterpreterSystemQuery::execute()
{
getContext()->checkAccess(AccessType::SYSTEM_UNFREEZE);
/// The result contains information about deleted parts as a table. It is for compatibility with ALTER TABLE UNFREEZE query.
result = Unfreezer().unfreeze(query.backup_name, getContext());
result = Unfreezer(getContext()).systemUnfreeze(query.backup_name);
break;
}
default:

View File

@ -473,6 +473,7 @@ ASTPtr MutationsInterpreter::prepare(bool dry_run)
dependencies = getAllColumnDependencies(metadata_snapshot, updated_columns);
std::vector<String> read_columns;
/// First, break a sequence of commands into stages.
for (auto & command : commands)
{
@ -706,17 +707,23 @@ ASTPtr MutationsInterpreter::prepare(bool dry_run)
else if (command.type == MutationCommand::READ_COLUMN)
{
mutation_kind.set(MutationKind::MUTATE_OTHER);
if (stages.empty() || !stages.back().column_to_updated.empty())
stages.emplace_back(context);
if (stages.size() == 1) /// First stage only supports filtering and can't update columns.
stages.emplace_back(context);
stages.back().column_to_updated.emplace(command.column_name, std::make_shared<ASTIdentifier>(command.column_name));
read_columns.emplace_back(command.column_name);
}
else
throw Exception("Unknown mutation command type: " + DB::toString<int>(command.type), ErrorCodes::UNKNOWN_MUTATION_COMMAND);
}
if (!read_columns.empty())
{
if (stages.empty() || !stages.back().column_to_updated.empty())
stages.emplace_back(context);
if (stages.size() == 1) /// First stage only supports filtering and can't update columns.
stages.emplace_back(context);
for (auto & column_name : read_columns)
stages.back().column_to_updated.emplace(column_name, std::make_shared<ASTIdentifier>(column_name));
}
/// We care about affected indices and projections because we also need to rewrite them
/// when one of index columns updated or filtered with delete.
/// The same about columns, that are needed for calculation of TTL expressions.

View File

@ -28,6 +28,11 @@ namespace CurrentMetrics
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
namespace
{
@ -84,10 +89,13 @@ void SortedBlocksWriter::insert(Block && block)
size_t bytes = 0;
size_t flush_no = 0;
if (!block.rows())
return;
{
std::lock_guard lock{insert_mutex};
/// insert bock into BlocksList undef lock
/// insert block into BlocksList under lock
inserted_blocks.insert(std::move(block));
size_t total_row_count = inserted_blocks.row_count + row_count_in_flush;
@ -145,7 +153,7 @@ SortedBlocksWriter::TmpFilePtr SortedBlocksWriter::flush(const BlocksList & bloc
pipes.emplace_back(std::make_shared<SourceFromSingleChunk>(block.cloneEmpty(), Chunk(block.getColumns(), num_rows)));
if (pipes.empty())
return {};
throw Exception(ErrorCodes::LOGICAL_ERROR, "Empty block");
QueryPipelineBuilder pipeline;
pipeline.init(Pipe::unitePipes(std::move(pipes)));

View File

@ -201,6 +201,10 @@ void ASTSystemQuery::formatImpl(const FormatSettings & settings, FormatState &,
if (!filesystem_cache_path.empty())
settings.ostr << (settings.hilite ? hilite_none : "") << " " << filesystem_cache_path;
}
else if (type == Type::UNFREEZE)
{
settings.ostr << (settings.hilite ? hilite_identifier : "") << backQuoteIfNeed(backup_name);
}
}

View File

@ -113,6 +113,11 @@ NamesAndTypesList IRowSchemaReader::readSchema()
"Most likely setting input_format_max_rows_to_read_for_schema_inference is set to 0");
DataTypes data_types = readRowAndGetDataTypes();
/// Check that we read at list one column.
if (data_types.empty())
throw Exception(ErrorCodes::EMPTY_DATA_PASSED, "Cannot read rows from the data");
/// If column names weren't set, use default names 'c1', 'c2', ...
if (column_names.empty())
{
@ -122,9 +127,11 @@ NamesAndTypesList IRowSchemaReader::readSchema()
}
/// If column names were set, check that the number of names match the number of types.
else if (column_names.size() != data_types.size())
{
throw Exception(
ErrorCodes::INCORRECT_DATA,
"The number of column names {} differs with the number of types {}", column_names.size(), data_types.size());
}
for (size_t i = 0; i != column_names.size(); ++i)
{
@ -155,10 +162,6 @@ NamesAndTypesList IRowSchemaReader::readSchema()
}
}
/// Check that we read at list one column.
if (data_types.empty())
throw Exception(ErrorCodes::EMPTY_DATA_PASSED, "Cannot read rows from the data");
NamesAndTypesList result;
for (size_t i = 0; i != data_types.size(); ++i)
{

View File

@ -140,6 +140,11 @@ namespace
size_t rows = 0;
size_t bytes = 0;
UInt32 shard_num = 0;
std::string cluster;
std::string distributed_table;
std::string remote_table;
/// dumpStructure() of the header -- obsolete
std::string block_header_string;
Block block_header;
@ -195,6 +200,14 @@ namespace
in.getFileName(), distributed_header.revision, DBMS_TCP_PROTOCOL_VERSION);
}
if (header_buf.hasPendingData())
{
readVarUInt(distributed_header.shard_num, header_buf);
readStringBinary(distributed_header.cluster, header_buf);
readStringBinary(distributed_header.distributed_table, header_buf);
readStringBinary(distributed_header.remote_table, header_buf);
}
/// Add handling new data here, for example:
///
/// if (header_buf.hasPendingData())
@ -621,18 +634,23 @@ void StorageDistributedDirectoryMonitor::processFile(const std::string & file_pa
ReadBufferFromFile in(file_path);
const auto & distributed_header = readDistributedHeader(in, log);
auto connection = pool->get(timeouts, &distributed_header.insert_settings);
thread_trace_context = std::make_unique<OpenTelemetry::TracingContextHolder>(__PRETTY_FUNCTION__,
distributed_header.client_info.client_trace_context,
this->storage.getContext()->getOpenTelemetrySpanLog());
thread_trace_context->root_span.addAttribute("clickhouse.shard_num", distributed_header.shard_num);
thread_trace_context->root_span.addAttribute("clickhouse.cluster", distributed_header.cluster);
thread_trace_context->root_span.addAttribute("clickhouse.distributed", distributed_header.distributed_table);
thread_trace_context->root_span.addAttribute("clickhouse.remote", distributed_header.remote_table);
thread_trace_context->root_span.addAttribute("clickhouse.rows", distributed_header.rows);
thread_trace_context->root_span.addAttribute("clickhouse.bytes", distributed_header.bytes);
auto connection = pool->get(timeouts, &distributed_header.insert_settings);
LOG_DEBUG(log, "Sending `{}` to {} ({} rows, {} bytes)",
file_path,
connection->getDescription(),
formatReadableQuantity(distributed_header.rows),
formatReadableSizeWithBinarySuffix(distributed_header.bytes));
thread_trace_context = std::make_unique<OpenTelemetry::TracingContextHolder>(__PRETTY_FUNCTION__,
distributed_header.client_info.client_trace_context,
this->storage.getContext()->getOpenTelemetrySpanLog());
RemoteInserter remote{*connection, timeouts,
distributed_header.insert_query,
distributed_header.insert_settings,

View File

@ -171,7 +171,6 @@ void DistributedSink::writeAsync(const Block & block)
}
else
{
if (storage.getShardingKeyExpr() && (cluster->getShardsInfo().size() > 1))
return writeSplitAsync(block);
@ -291,6 +290,8 @@ DistributedSink::runWritingJob(JobReplica & job, const Block & current_block, si
auto thread_group = CurrentThread::getGroup();
return [this, thread_group, &job, &current_block, num_shards]()
{
OpenTelemetry::SpanHolder span(__PRETTY_FUNCTION__);
if (thread_group)
CurrentThread::attachToIfDetached(thread_group);
setThreadName("DistrOutStrProc");
@ -331,15 +332,19 @@ DistributedSink::runWritingJob(JobReplica & job, const Block & current_block, si
const Block & shard_block = (num_shards > 1) ? job.current_shard_block : current_block;
const Settings & settings = context->getSettingsRef();
/// Do not initiate INSERT for empty block.
size_t rows = shard_block.rows();
span.addAttribute("clickhouse.shard_num", shard_info.shard_num);
span.addAttribute("clickhouse.cluster", this->storage.cluster_name);
span.addAttribute("clickhouse.distributed", this->storage.getStorageID().getFullNameNotQuoted());
span.addAttribute("clickhouse.remote", [this]() { return storage.remote_database + "." + storage.remote_table; });
span.addAttribute("clickhouse.rows", rows);
span.addAttribute("clickhouse.bytes", [&shard_block]() { return toString(shard_block.bytes()); });
/// Do not initiate INSERT for empty block.
if (rows == 0)
return;
OpenTelemetry::SpanHolder span(__PRETTY_FUNCTION__);
span.addAttribute("clickhouse.shard_num", shard_info.shard_num);
span.addAttribute("clickhouse.written_rows", rows);
if (!job.is_local_job || !settings.prefer_localhost_replica)
{
if (!job.executor)
@ -610,20 +615,15 @@ void DistributedSink::writeSplitAsync(const Block & block)
void DistributedSink::writeAsyncImpl(const Block & block, size_t shard_id)
{
OpenTelemetry::SpanHolder span("DistributedSink::writeAsyncImpl()");
const auto & shard_info = cluster->getShardsInfo()[shard_id];
const auto & settings = context->getSettingsRef();
Block block_to_send = removeSuperfluousColumns(block);
span.addAttribute("clickhouse.shard_num", shard_info.shard_num);
span.addAttribute("clickhouse.written_rows", block.rows());
if (shard_info.hasInternalReplication())
{
if (shard_info.isLocal() && settings.prefer_localhost_replica)
/// Prefer insert into current instance directly
writeToLocal(block_to_send, shard_info.getLocalNodeCount());
writeToLocal(shard_info, block_to_send, shard_info.getLocalNodeCount());
else
{
const auto & path = shard_info.insertPathForInternalReplication(
@ -631,13 +631,13 @@ void DistributedSink::writeAsyncImpl(const Block & block, size_t shard_id)
settings.use_compact_format_in_distributed_parts_names);
if (path.empty())
throw Exception("Directory name for async inserts is empty", ErrorCodes::LOGICAL_ERROR);
writeToShard(block_to_send, {path});
writeToShard(shard_info, block_to_send, {path});
}
}
else
{
if (shard_info.isLocal() && settings.prefer_localhost_replica)
writeToLocal(block_to_send, shard_info.getLocalNodeCount());
writeToLocal(shard_info, block_to_send, shard_info.getLocalNodeCount());
std::vector<std::string> dir_names;
for (const auto & address : cluster->getShardsAddresses()[shard_id])
@ -645,30 +645,44 @@ void DistributedSink::writeAsyncImpl(const Block & block, size_t shard_id)
dir_names.push_back(address.toFullString(settings.use_compact_format_in_distributed_parts_names));
if (!dir_names.empty())
writeToShard(block_to_send, dir_names);
writeToShard(shard_info, block_to_send, dir_names);
}
}
void DistributedSink::writeToLocal(const Block & block, size_t repeats)
void DistributedSink::writeToLocal(const Cluster::ShardInfo & shard_info, const Block & block, size_t repeats)
{
OpenTelemetry::SpanHolder span(__PRETTY_FUNCTION__);
span.addAttribute("db.statement", this->query_string);
span.addAttribute("clickhouse.shard_num", shard_info.shard_num);
span.addAttribute("clickhouse.cluster", this->storage.cluster_name);
span.addAttribute("clickhouse.distributed", this->storage.getStorageID().getFullNameNotQuoted());
span.addAttribute("clickhouse.remote", [this]() { return storage.remote_database + "." + storage.remote_table; });
span.addAttribute("clickhouse.rows", [&block]() { return toString(block.rows()); });
span.addAttribute("clickhouse.bytes", [&block]() { return toString(block.bytes()); });
InterpreterInsertQuery interp(query_ast, context, allow_materialized);
try
{
InterpreterInsertQuery interp(query_ast, context, allow_materialized);
auto block_io = interp.execute();
PushingPipelineExecutor executor(block_io.pipeline);
auto block_io = interp.execute();
PushingPipelineExecutor executor(block_io.pipeline);
executor.start();
writeBlockConvert(executor, block, repeats, log);
executor.finish();
executor.start();
writeBlockConvert(executor, block, repeats, log);
executor.finish();
}
catch (...)
{
span.addAttribute(std::current_exception());
throw;
}
}
void DistributedSink::writeToShard(const Block & block, const std::vector<std::string> & dir_names)
void DistributedSink::writeToShard(const Cluster::ShardInfo & shard_info, const Block & block, const std::vector<std::string> & dir_names)
{
OpenTelemetry::SpanHolder span(__PRETTY_FUNCTION__);
span.addAttribute("clickhouse.shard_num", shard_info.shard_num);
const auto & settings = context->getSettingsRef();
const auto & distributed_settings = storage.getDistributedSettingsRef();
@ -759,6 +773,11 @@ void DistributedSink::writeToShard(const Block & block, const std::vector<std::s
header_stream.write(block.cloneEmpty());
}
writeVarUInt(shard_info.shard_num, header_buf);
writeStringBinary(this->storage.cluster_name, header_buf);
writeStringBinary(this->storage.getStorageID().getFullNameNotQuoted(), header_buf);
writeStringBinary(this->storage.remote_database + "." + this->storage.remote_table, header_buf);
/// Add new fields here, for example:
/// writeVarUInt(my_new_data, header_buf);
/// And note that it is safe, because we have checksum and size for header.

View File

@ -69,9 +69,9 @@ private:
Block removeSuperfluousColumns(Block block) const;
/// Increments finished_writings_count after each repeat.
void writeToLocal(const Block & block, size_t repeats);
void writeToLocal(const Cluster::ShardInfo & shard_info, const Block & block, size_t repeats);
void writeToShard(const Block & block, const std::vector<std::string> & dir_names);
void writeToShard(const Cluster::ShardInfo & shard_info, const Block & block, const std::vector<std::string> & dir_names);
/// Performs synchronous insertion to remote nodes. If timeout_exceeded flag was set, throws.

View File

@ -5,12 +5,29 @@
#include <Common/escapeForFileName.h>
#include <Common/logger_useful.h>
/**
* When ClickHouse has frozen data on remote storage it required 'smart' data removing during UNFREEZE.
* For remote storage actually frozen not remote data but local metadata with referrers on remote data.
* So remote data can be referred from working and frozen data sets (or two frozen) at same time.
* In this case during UNFREEZE ClickHouse should remove only local metadata and keep remote data.
* But when data was already removed from working data set ClickHouse should remove remote data too.
* To detect is current data used or not in some other place ClickHouse uses
* - ref_count from metadata to check if data used in some other metadata on the same replica;
* - Keeper record to check if data used on other replica.
* StorageReplicatedMergeTree::removeSharedDetachedPart makes required checks, so here this method
* called for each frozen part.
*/
namespace DB
{
namespace ErrorCodes
{
extern const int SUPPORT_IS_DISABLED;
}
void FreezeMetaData::fill(const StorageReplicatedMergeTree & storage)
{
is_replicated = storage.supportsReplication();
is_remote = storage.isRemote();
replica_name = storage.getReplicaName();
zookeeper_name = storage.getZooKeeperName();
table_shared_id = storage.getTableSharedID();
@ -26,11 +43,17 @@ void FreezeMetaData::save(DiskPtr data_disk, const String & path) const
writeIntText(version, buffer);
buffer.write("\n", 1);
writeBoolText(is_replicated, buffer);
buffer.write("\n", 1);
writeBoolText(is_remote, buffer);
buffer.write("\n", 1);
writeString(replica_name, buffer);
if (version == 1)
{
/// is_replicated and is_remote are not used
bool is_replicated = true;
writeBoolText(is_replicated, buffer);
buffer.write("\n", 1);
bool is_remote = true;
writeBoolText(is_remote, buffer);
buffer.write("\n", 1);
}
writeString(escapeForFileName(replica_name), buffer);
buffer.write("\n", 1);
writeString(zookeeper_name, buffer);
buffer.write("\n", 1);
@ -51,17 +74,25 @@ bool FreezeMetaData::load(DiskPtr data_disk, const String & path)
auto metadata_str = metadata_storage->readFileToString(file_path);
ReadBufferFromString buffer(metadata_str);
readIntText(version, buffer);
if (version != 1)
if (version < 1 || version > 2)
{
LOG_ERROR(&Poco::Logger::get("FreezeMetaData"), "Unknown freezed metadata version: {}", version);
LOG_ERROR(&Poco::Logger::get("FreezeMetaData"), "Unknown frozen metadata version: {}", version);
return false;
}
DB::assertChar('\n', buffer);
readBoolText(is_replicated, buffer);
DB::assertChar('\n', buffer);
readBoolText(is_remote, buffer);
DB::assertChar('\n', buffer);
readString(replica_name, buffer);
if (version == 1)
{
/// is_replicated and is_remote are not used
bool is_replicated;
readBoolText(is_replicated, buffer);
DB::assertChar('\n', buffer);
bool is_remote;
readBoolText(is_remote, buffer);
DB::assertChar('\n', buffer);
}
std::string unescaped_replica_name;
readString(unescaped_replica_name, buffer);
replica_name = unescapeForFileName(unescaped_replica_name);
DB::assertChar('\n', buffer);
readString(zookeeper_name, buffer);
DB::assertChar('\n', buffer);
@ -87,9 +118,23 @@ String FreezeMetaData::getFileName(const String & path)
return fs::path(path) / "frozen_metadata.txt";
}
BlockIO Unfreezer::unfreeze(const String & backup_name, ContextPtr local_context)
Unfreezer::Unfreezer(ContextPtr context) : local_context(context)
{
LOG_DEBUG(log, "Unfreezing backup {}", backup_name);
if (local_context->hasZooKeeper())
zookeeper = local_context->getZooKeeper();
}
BlockIO Unfreezer::systemUnfreeze(const String & backup_name)
{
LOG_DEBUG(log, "Unfreezing backup {}", escapeForFileName(backup_name));
const auto & config = local_context->getConfigRef();
static constexpr auto config_key = "enable_system_unfreeze";
if (!config.getBool(config_key, false))
{
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Support for SYSTEM UNFREEZE query is disabled. You can enable it via '{}' server setting", config_key);
}
auto disks_map = local_context->getDisksMap();
Disks disks;
for (auto & [name, disk]: disks_map)
@ -97,33 +142,38 @@ BlockIO Unfreezer::unfreeze(const String & backup_name, ContextPtr local_context
disks.push_back(disk);
}
auto backup_path = fs::path(backup_directory_prefix) / escapeForFileName(backup_name);
auto store_path = backup_path / "store";
auto store_paths = {backup_path / "store", backup_path / "data"};
PartitionCommandsResultInfo result_info;
for (const auto & disk: disks)
{
if (!disk->exists(store_path))
continue;
for (auto prefix_it = disk->iterateDirectory(store_path); prefix_it->isValid(); prefix_it->next())
for (const auto& store_path: store_paths)
{
auto prefix_directory = store_path / prefix_it->name();
for (auto table_it = disk->iterateDirectory(prefix_directory); table_it->isValid(); table_it->next())
if (!disk->exists(store_path))
continue;
for (auto prefix_it = disk->iterateDirectory(store_path); prefix_it->isValid(); prefix_it->next())
{
auto table_directory = prefix_directory / table_it->name();
auto current_result_info = unfreezePartitionsFromTableDirectory([] (const String &) { return true; }, backup_name, {disk}, table_directory, local_context);
for (auto & command_result : current_result_info)
auto prefix_directory = store_path / prefix_it->name();
for (auto table_it = disk->iterateDirectory(prefix_directory); table_it->isValid(); table_it->next())
{
command_result.command_type = "SYSTEM UNFREEZE";
auto table_directory = prefix_directory / table_it->name();
auto current_result_info = unfreezePartitionsFromTableDirectory(
[](const String &) { return true; }, backup_name, {disk}, table_directory);
for (auto & command_result : current_result_info)
{
command_result.command_type = "SYSTEM UNFREEZE";
}
result_info.insert(
result_info.end(),
std::make_move_iterator(current_result_info.begin()),
std::make_move_iterator(current_result_info.end()));
}
result_info.insert(
result_info.end(),
std::make_move_iterator(current_result_info.begin()),
std::make_move_iterator(current_result_info.end()));
}
}
if (disk->exists(backup_path))
{
/// After unfreezing we need to clear revision.txt file and empty directories
disk->removeRecursive(backup_path);
}
}
@ -136,18 +186,15 @@ BlockIO Unfreezer::unfreeze(const String & backup_name, ContextPtr local_context
return result;
}
bool Unfreezer::removeFreezedPart(DiskPtr disk, const String & path, const String & part_name, ContextPtr local_context)
bool Unfreezer::removeFreezedPart(DiskPtr disk, const String & path, const String & part_name, ContextPtr local_context, zkutil::ZooKeeperPtr zookeeper)
{
if (disk->supportZeroCopyReplication())
{
FreezeMetaData meta;
if (meta.load(disk, path))
{
if (meta.is_replicated)
{
FreezeMetaData::clean(disk, path);
return StorageReplicatedMergeTree::removeSharedDetachedPart(disk, path, part_name, meta.table_shared_id, meta.zookeeper_name, meta.replica_name, "", local_context);
}
FreezeMetaData::clean(disk, path);
return StorageReplicatedMergeTree::removeSharedDetachedPart(disk, path, part_name, meta.table_shared_id, meta.zookeeper_name, meta.replica_name, "", local_context, zookeeper);
}
}
@ -156,7 +203,7 @@ bool Unfreezer::removeFreezedPart(DiskPtr disk, const String & path, const Strin
return false;
}
PartitionCommandsResultInfo Unfreezer::unfreezePartitionsFromTableDirectory(MergeTreeData::MatcherFn matcher, const String & backup_name, const Disks & disks, const fs::path & table_directory, ContextPtr local_context)
PartitionCommandsResultInfo Unfreezer::unfreezePartitionsFromTableDirectory(MergeTreeData::MatcherFn matcher, const String & backup_name, const Disks & disks, const fs::path & table_directory)
{
PartitionCommandsResultInfo result;
@ -180,7 +227,7 @@ PartitionCommandsResultInfo Unfreezer::unfreezePartitionsFromTableDirectory(Merg
const auto & path = it->path();
bool keep_shared = removeFreezedPart(disk, path, partition_directory, local_context);
bool keep_shared = removeFreezedPart(disk, path, partition_directory, local_context, zookeeper);
result.push_back(PartitionCommandResultInfo{
.partition_id = partition_id,

View File

@ -23,9 +23,7 @@ private:
static String getFileName(const String & path);
public:
int version = 1;
bool is_replicated{false};
bool is_remote{false};
int version = 2;
String replica_name;
String zookeeper_name;
String table_shared_id;
@ -34,12 +32,15 @@ public:
class Unfreezer
{
public:
PartitionCommandsResultInfo unfreezePartitionsFromTableDirectory(MergeTreeData::MatcherFn matcher, const String & backup_name, const Disks & disks, const fs::path & table_directory, ContextPtr local_context);
BlockIO unfreeze(const String & backup_name, ContextPtr local_context);
Unfreezer(ContextPtr context);
PartitionCommandsResultInfo unfreezePartitionsFromTableDirectory(MergeTreeData::MatcherFn matcher, const String & backup_name, const Disks & disks, const fs::path & table_directory);
BlockIO systemUnfreeze(const String & backup_name);
private:
ContextPtr local_context;
zkutil::ZooKeeperPtr zookeeper;
Poco::Logger * log = &Poco::Logger::get("Unfreezer");
static constexpr std::string_view backup_directory_prefix = "shadow";
static bool removeFreezedPart(DiskPtr disk, const String & path, const String & part_name, ContextPtr local_context);
static bool removeFreezedPart(DiskPtr disk, const String & path, const String & part_name, ContextPtr local_context, zkutil::ZooKeeperPtr zookeeper);
};
}

View File

@ -1992,7 +1992,7 @@ size_t MergeTreeData::clearOldBrokenPartsFromDetachedDirectory()
for (auto & [old_name, new_name, disk] : renamed_parts.old_and_new_names)
{
removeDetachedPart(disk, fs::path(relative_data_path) / "detached" / new_name / "", old_name, false);
removeDetachedPart(disk, fs::path(relative_data_path) / "detached" / new_name / "", old_name);
LOG_DEBUG(log, "Removed broken detached part {} due to a timeout for broken detached parts", old_name);
old_name.clear();
}
@ -4744,7 +4744,7 @@ void MergeTreeData::dropDetached(const ASTPtr & partition, bool part, ContextPtr
for (auto & [old_name, new_name, disk] : renamed_parts.old_and_new_names)
{
bool keep_shared = removeDetachedPart(disk, fs::path(relative_data_path) / "detached" / new_name / "", old_name, false);
bool keep_shared = removeDetachedPart(disk, fs::path(relative_data_path) / "detached" / new_name / "", old_name);
LOG_DEBUG(log, "Dropped detached part {}, keep shared data: {}", old_name, keep_shared);
old_name.clear();
}
@ -6411,7 +6411,7 @@ PartitionCommandsResultInfo MergeTreeData::unfreezeAll(
return unfreezePartitionsByMatcher([] (const String &) { return true; }, backup_name, local_context);
}
bool MergeTreeData::removeDetachedPart(DiskPtr disk, const String & path, const String &, bool)
bool MergeTreeData::removeDetachedPart(DiskPtr disk, const String & path, const String &)
{
disk->removeRecursive(path);
@ -6426,7 +6426,7 @@ PartitionCommandsResultInfo MergeTreeData::unfreezePartitionsByMatcher(MatcherFn
auto disks = getStoragePolicy()->getDisks();
return Unfreezer().unfreezePartitionsFromTableDirectory(matcher, backup_name, disks, backup_path, local_context);
return Unfreezer(local_context).unfreezePartitionsFromTableDirectory(matcher, backup_name, disks, backup_path);
}
bool MergeTreeData::canReplacePartition(const DataPartPtr & src_part) const

View File

@ -970,7 +970,7 @@ public:
/// Check shared data usage on other replicas for detached/freezed part
/// Remove local files and remote files if needed
virtual bool removeDetachedPart(DiskPtr disk, const String & path, const String & part_name, bool is_freezed);
virtual bool removeDetachedPart(DiskPtr disk, const String & path, const String & part_name);
virtual String getTableSharedID() const { return ""; }

View File

@ -2,6 +2,7 @@
#include <DataTypes/NestedUtils.h>
#include <Storages/MergeTree/MergeTreeReaderCompact.h>
#include <Storages/MergeTree/MergeTreeDataPartWriterCompact.h>
#include <Interpreters/Context.h>
#include <Storages/MergeTree/LoadedMergeTreeDataPartInfoForReader.h>
@ -47,9 +48,11 @@ IMergeTreeDataPart::MergeTreeReaderPtr MergeTreeDataPartCompact::getReader(
const ReadBufferFromFileBase::ProfileCallback & profile_callback) const
{
auto read_info = std::make_shared<LoadedMergeTreeDataPartInfoForReader>(shared_from_this());
auto * load_marks_threadpool = reader_settings.read_settings.load_marks_asynchronously ? &read_info->getContext()->getLoadMarksThreadpool() : nullptr;
return std::make_unique<MergeTreeReaderCompact>(
read_info, columns_to_read, metadata_snapshot, uncompressed_cache,
mark_cache, mark_ranges, reader_settings,
mark_cache, mark_ranges, reader_settings, load_marks_threadpool,
avg_value_size_hints, profile_callback);
}

View File

@ -1,4 +1,5 @@
#include <Storages/MergeTree/MergeTreeIndexReader.h>
#include <Interpreters/Context.h>
namespace
{
@ -15,6 +16,9 @@ std::unique_ptr<MergeTreeReaderStream> makeIndexReader(
UncompressedCache * uncompressed_cache,
MergeTreeReaderSettings settings)
{
auto context = part->storage.getContext();
auto * load_marks_threadpool = settings.read_settings.load_marks_asynchronously ? &context->getLoadMarksThreadpool() : nullptr;
return std::make_unique<MergeTreeReaderStream>(
part->data_part_storage,
index->getFileName(), extension, marks_count,
@ -22,7 +26,7 @@ std::unique_ptr<MergeTreeReaderStream> makeIndexReader(
std::move(settings), mark_cache, uncompressed_cache,
part->getFileSizeOrZero(index->getFileName() + extension),
&part->index_granularity_info,
ReadBufferFromFileBase::ProfileCallback{}, CLOCK_MONOTONIC_COARSE, false);
ReadBufferFromFileBase::ProfileCallback{}, CLOCK_MONOTONIC_COARSE, false, load_marks_threadpool);
}
}

View File

@ -2,9 +2,19 @@
#include <Storages/MergeTree/MergeTreeData.h>
#include <Common/MemoryTrackerBlockerInThread.h>
#include <IO/ReadBufferFromFile.h>
#include <Common/setThreadName.h>
#include <Common/scope_guard_safe.h>
#include <Common/CurrentMetrics.h>
#include <Common/ThreadPool.h>
#include <utility>
namespace ProfileEvents
{
extern const Event WaitMarksLoadMicroseconds;
extern const Event BackgroundLoadingMarksTasks;
}
namespace DB
{
@ -23,6 +33,7 @@ MergeTreeMarksLoader::MergeTreeMarksLoader(
const MergeTreeIndexGranularityInfo & index_granularity_info_,
bool save_marks_in_cache_,
const ReadSettings & read_settings_,
ThreadPool * load_marks_threadpool_,
size_t columns_in_mark_)
: data_part_storage(std::move(data_part_storage_))
, mark_cache(mark_cache_)
@ -32,13 +43,41 @@ MergeTreeMarksLoader::MergeTreeMarksLoader(
, save_marks_in_cache(save_marks_in_cache_)
, columns_in_mark(columns_in_mark_)
, read_settings(read_settings_)
, load_marks_threadpool(load_marks_threadpool_)
{
if (load_marks_threadpool)
{
future = loadMarksAsync();
}
}
MergeTreeMarksLoader::~MergeTreeMarksLoader()
{
if (future.valid())
{
future.wait();
}
}
const MarkInCompressedFile & MergeTreeMarksLoader::getMark(size_t row_index, size_t column_index)
{
if (!marks)
loadMarks();
{
Stopwatch watch(CLOCK_MONOTONIC);
if (future.valid())
{
marks = future.get();
future = {};
}
else
{
marks = loadMarks();
}
watch.stop();
ProfileEvents::increment(ProfileEvents::WaitMarksLoadMicroseconds, watch.elapsedMicroseconds());
}
#ifndef NDEBUG
if (column_index >= columns_in_mark)
@ -95,28 +134,63 @@ MarkCache::MappedPtr MergeTreeMarksLoader::loadMarksImpl()
return res;
}
void MergeTreeMarksLoader::loadMarks()
MarkCache::MappedPtr MergeTreeMarksLoader::loadMarks()
{
MarkCache::MappedPtr loaded_marks;
if (mark_cache)
{
auto key = mark_cache->hash(fs::path(data_part_storage->getFullPath()) / mrk_path);
if (save_marks_in_cache)
{
auto callback = [this]{ return loadMarksImpl(); };
marks = mark_cache->getOrSet(key, callback);
loaded_marks = mark_cache->getOrSet(key, callback);
}
else
{
marks = mark_cache->get(key);
if (!marks)
marks = loadMarksImpl();
loaded_marks = mark_cache->get(key);
if (!loaded_marks)
loaded_marks = loadMarksImpl();
}
}
else
marks = loadMarksImpl();
loaded_marks = loadMarksImpl();
if (!marks)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Failed to load marks: {}", String(fs::path(data_part_storage->getFullPath()) / mrk_path));
if (!loaded_marks)
{
throw Exception(
ErrorCodes::LOGICAL_ERROR, "Failed to load marks: {}",
(fs::path(data_part_storage->getFullPath()) / mrk_path).string());
}
return loaded_marks;
}
std::future<MarkCache::MappedPtr> MergeTreeMarksLoader::loadMarksAsync()
{
ThreadGroupStatusPtr thread_group;
if (CurrentThread::isInitialized() && CurrentThread::get().getThreadGroup())
thread_group = CurrentThread::get().getThreadGroup();
auto task = std::make_shared<std::packaged_task<MarkCache::MappedPtr()>>([thread_group, this]
{
setThreadName("loadMarksThread");
if (thread_group)
CurrentThread::attachTo(thread_group);
SCOPE_EXIT_SAFE({
if (thread_group)
CurrentThread::detachQuery();
});
ProfileEvents::increment(ProfileEvents::BackgroundLoadingMarksTasks);
return loadMarks();
});
auto task_future = task->get_future();
load_marks_threadpool->scheduleOrThrow([task]{ (*task)(); });
return task_future;
}
}

View File

@ -2,11 +2,13 @@
#include <Storages/MergeTree/IDataPartStorage.h>
#include <Storages/MarkCache.h>
#include <IO/ReadSettings.h>
#include <Common/ThreadPool.h>
namespace DB
{
struct MergeTreeIndexGranularityInfo;
class Threadpool;
class MergeTreeMarksLoader
{
@ -21,8 +23,11 @@ public:
const MergeTreeIndexGranularityInfo & index_granularity_info_,
bool save_marks_in_cache_,
const ReadSettings & read_settings_,
ThreadPool * load_marks_threadpool_,
size_t columns_in_mark_ = 1);
~MergeTreeMarksLoader();
const MarkInCompressedFile & getMark(size_t row_index, size_t column_index = 0);
private:
@ -36,8 +41,12 @@ private:
MarkCache::MappedPtr marks;
ReadSettings read_settings;
void loadMarks();
MarkCache::MappedPtr loadMarks();
std::future<MarkCache::MappedPtr> loadMarksAsync();
MarkCache::MappedPtr loadMarksImpl();
std::future<MarkCache::MappedPtr> future;
ThreadPool * load_marks_threadpool;
};
}

View File

@ -22,6 +22,7 @@ MergeTreeReaderCompact::MergeTreeReaderCompact(
MarkCache * mark_cache_,
MarkRanges mark_ranges_,
MergeTreeReaderSettings settings_,
ThreadPool * load_marks_threadpool_,
ValueSizeMap avg_value_size_hints_,
const ReadBufferFromFileBase::ProfileCallback & profile_callback_,
clockid_t clock_type_)
@ -42,6 +43,7 @@ MergeTreeReaderCompact::MergeTreeReaderCompact(
data_part_info_for_read_->getIndexGranularityInfo(),
settings.save_marks_in_cache,
settings.read_settings,
load_marks_threadpool_,
data_part_info_for_read_->getColumns().size())
{
try

View File

@ -26,6 +26,7 @@ public:
MarkCache * mark_cache_,
MarkRanges mark_ranges_,
MergeTreeReaderSettings settings_,
ThreadPool * load_marks_threadpool_,
ValueSizeMap avg_value_size_hints_ = {},
const ReadBufferFromFileBase::ProfileCallback & profile_callback_ = {},
clockid_t clock_type_ = CLOCK_MONOTONIC_COARSE);

View File

@ -16,14 +16,19 @@ namespace ErrorCodes
MergeTreeReaderStream::MergeTreeReaderStream(
DataPartStoragePtr data_part_storage_,
const String & path_prefix_, const String & data_file_extension_, size_t marks_count_,
const String & path_prefix_,
const String & data_file_extension_,
size_t marks_count_,
const MarkRanges & all_mark_ranges_,
const MergeTreeReaderSettings & settings_,
MarkCache * mark_cache_,
UncompressedCache * uncompressed_cache_, size_t file_size_,
UncompressedCache * uncompressed_cache_,
size_t file_size_,
const MergeTreeIndexGranularityInfo * index_granularity_info_,
const ReadBufferFromFileBase::ProfileCallback & profile_callback_, clockid_t clock_type_,
bool is_low_cardinality_dictionary_)
const ReadBufferFromFileBase::ProfileCallback & profile_callback_,
clockid_t clock_type_,
bool is_low_cardinality_dictionary_,
ThreadPool * load_marks_cache_threadpool_)
: settings(settings_)
, profile_callback(profile_callback_)
, clock_type(clock_type_)
@ -45,7 +50,8 @@ MergeTreeReaderStream::MergeTreeReaderStream(
marks_count,
*index_granularity_info,
save_marks_in_cache,
settings.read_settings)
settings.read_settings,
load_marks_cache_threadpool_)
{
}

View File

@ -20,13 +20,19 @@ class MergeTreeReaderStream
public:
MergeTreeReaderStream(
DataPartStoragePtr data_part_storage_,
const String & path_prefix_, const String & data_file_extension_, size_t marks_count_,
const String & path_prefix_,
const String & data_file_extension_,
size_t marks_count_,
const MarkRanges & all_mark_ranges,
const MergeTreeReaderSettings & settings_,
MarkCache * mark_cache, UncompressedCache * uncompressed_cache,
size_t file_size_, const MergeTreeIndexGranularityInfo * index_granularity_info_,
const ReadBufferFromFileBase::ProfileCallback & profile_callback, clockid_t clock_type,
bool is_low_cardinality_dictionary_);
MarkCache * mark_cache,
UncompressedCache * uncompressed_cache,
size_t file_size_,
const MergeTreeIndexGranularityInfo * index_granularity_info_,
const ReadBufferFromFileBase::ProfileCallback & profile_callback,
clockid_t clock_type,
bool is_low_cardinality_dictionary_,
ThreadPool * load_marks_cache_threadpool_);
void seekToMark(size_t index);

View File

@ -6,6 +6,7 @@
#include <DataTypes/NestedUtils.h>
#include <DataTypes/DataTypeNested.h>
#include <Interpreters/inplaceBlockConversions.h>
#include <Interpreters/Context.h>
#include <Storages/MergeTree/IMergeTreeReader.h>
#include <Storages/MergeTree/MergeTreeDataPartWide.h>
#include <Common/escapeForFileName.h>
@ -186,12 +187,15 @@ void MergeTreeReaderWide::addStreams(
has_any_stream = true;
bool is_lc_dict = substream_path.size() > 1 && substream_path[substream_path.size() - 2].type == ISerialization::Substream::Type::DictionaryKeys;
auto context = data_part_info_for_read->getContext();
auto * load_marks_threadpool = settings.read_settings.load_marks_asynchronously ? &context->getLoadMarksThreadpool() : nullptr;
streams.emplace(stream_name, std::make_unique<MergeTreeReaderStream>(
data_part_info_for_read->getDataPartStorage(), stream_name, DATA_FILE_EXTENSION,
data_part_info_for_read->getMarksCount(), all_mark_ranges, settings, mark_cache,
uncompressed_cache, data_part_info_for_read->getFileSizeOrZero(stream_name + DATA_FILE_EXTENSION),
&data_part_info_for_read->getIndexGranularityInfo(),
profile_callback, clock_type, is_lc_dict));
profile_callback, clock_type, is_lc_dict, load_marks_threadpool));
};
serialization->enumerateStreams(callback);

View File

@ -203,11 +203,11 @@ void ReplicatedMergeTreeSink::consume(Chunk chunk)
}
block_id = temp_part.part->getZeroLevelPartBlockID(block_dedup_token);
LOG_DEBUG(log, "Wrote block with ID '{}', {} rows on {} replicas", block_id, current_block.block.rows(), replicas_num);
LOG_DEBUG(log, "Wrote block with ID '{}', {} rows{}", block_id, current_block.block.rows(), quorumLogMessage(replicas_num));
}
else
{
LOG_DEBUG(log, "Wrote block with {} rows on {} replicas", current_block.block.rows(), replicas_num);
LOG_DEBUG(log, "Wrote block with {} rows{}", current_block.block.rows(), quorumLogMessage(replicas_num));
}
UInt64 elapsed_ns = watch.elapsed();
@ -639,7 +639,7 @@ void ReplicatedMergeTreeSink::waitForQuorum(
size_t replicas_num) const
{
/// We are waiting for quorum to be satisfied.
LOG_TRACE(log, "Waiting for quorum '{}' for part {} on {} replicas", quorum_path, part_name, replicas_num);
LOG_TRACE(log, "Waiting for quorum '{}' for part {}{}", quorum_path, part_name, quorumLogMessage(replicas_num));
try
{
@ -684,6 +684,13 @@ void ReplicatedMergeTreeSink::waitForQuorum(
LOG_TRACE(log, "Quorum '{}' for part {} satisfied", quorum_path, part_name);
}
String ReplicatedMergeTreeSink::quorumLogMessage(size_t replicas_num) const
{
if (!isQuorumEnabled())
return "";
return fmt::format(" (quorum {} of {} replicas)", getQuorumSize(replicas_num), replicas_num);
}
size_t ReplicatedMergeTreeSink::getQuorumSize(size_t replicas_num) const
{
if (!isQuorumEnabled())

View File

@ -96,6 +96,7 @@ private:
size_t getQuorumSize(size_t replicas_num) const;
bool isQuorumEnabled() const;
String quorumLogMessage(size_t replicas_num) const; /// Used in logs for debug purposes
size_t quorum_timeout_ms;
size_t max_parts_per_block;

View File

@ -523,8 +523,7 @@ Chunk StorageEmbeddedRocksDB::getByKeys(
Block StorageEmbeddedRocksDB::getSampleBlock(const Names &) const
{
auto metadata = getInMemoryMetadataPtr();
return metadata ? metadata->getSampleBlock() : Block();
return getInMemoryMetadataPtr()->getSampleBlock();
}
Chunk StorageEmbeddedRocksDB::getBySerializedKeys(

View File

@ -0,0 +1,771 @@
#include <Storages/StorageKeeperMap.h>
#include <Columns/ColumnString.h>
#include <Databases/DatabaseReplicated.h>
#include <Core/NamesAndTypes.h>
#include <Core/UUID.h>
#include <Core/ServerUUID.h>
#include <DataTypes/DataTypeString.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/formatAST.h>
#include <Processors/ISource.h>
#include <Processors/Sinks/SinkToStorage.h>
#include <Storages/ColumnsDescription.h>
#include <Storages/KVStorageUtils.h>
#include <Storages/StorageFactory.h>
#include <Storages/StorageInMemoryMetadata.h>
#include <Storages/checkAndGetLiteralArgument.h>
#include <Common/Base64.h>
#include <Common/Exception.h>
#include <Common/ZooKeeper/IKeeper.h>
#include <Common/ZooKeeper/KeeperException.h>
#include <Common/ZooKeeper/Types.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/ZooKeeper/ZooKeeperConstants.h>
#include <base/types.h>
#include <boost/algorithm/string/classification.hpp>
namespace DB
{
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int BAD_ARGUMENTS;
extern const int KEEPER_EXCEPTION;
extern const int LOGICAL_ERROR;
extern const int LIMIT_EXCEEDED;
}
namespace
{
std::string formattedAST(const ASTPtr & ast)
{
if (!ast)
return "";
return serializeAST(*ast);
}
void verifyTableId(const StorageID & table_id)
{
if (!table_id.hasUUID())
{
auto database = DatabaseCatalog::instance().getDatabase(table_id.database_name);
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"KeeperMap cannot be used with '{}' database because it uses {} engine. Please use Atomic or Replicated database",
table_id.getDatabaseName(),
database->getEngineName());
}
}
}
class StorageKeeperMapSink : public SinkToStorage
{
StorageKeeperMap & storage;
std::unordered_map<std::string, std::string> new_values;
size_t primary_key_pos;
public:
StorageKeeperMapSink(StorageKeeperMap & storage_, const StorageMetadataPtr & metadata_snapshot)
: SinkToStorage(metadata_snapshot->getSampleBlock()), storage(storage_)
{
auto primary_key = storage.getPrimaryKey();
assert(primary_key.size() == 1);
primary_key_pos = getHeader().getPositionByName(primary_key[0]);
}
std::string getName() const override { return "StorageKeeperMapSink"; }
void consume(Chunk chunk) override
{
auto rows = chunk.getNumRows();
auto block = getHeader().cloneWithColumns(chunk.detachColumns());
WriteBufferFromOwnString wb_key;
WriteBufferFromOwnString wb_value;
for (size_t i = 0; i < rows; ++i)
{
wb_key.restart();
wb_value.restart();
size_t idx = 0;
for (const auto & elem : block)
{
elem.type->getDefaultSerialization()->serializeBinary(*elem.column, i, idx == primary_key_pos ? wb_key : wb_value);
++idx;
}
auto key = base64Encode(wb_key.str(), /* url_encoding */ true);
new_values[std::move(key)] = std::move(wb_value.str());
}
}
void onFinish() override
{
auto zookeeper = storage.getClient();
Coordination::Requests requests;
auto keys_limit = storage.keysLimit();
size_t current_keys_num = 0;
size_t new_keys_num = 0;
// We use keys limit as a soft limit so we ignore some cases when it can be still exceeded
// (e.g if parallel insert queries are being run)
if (keys_limit != 0)
{
Coordination::Stat data_stat;
zookeeper->get(storage.dataPath(), &data_stat);
current_keys_num = data_stat.numChildren;
}
std::vector<std::pair<const std::string *, std::future<Coordination::ExistsResponse>>> exist_responses;
for (const auto & [key, value] : new_values)
{
auto path = storage.fullPathForKey(key);
exist_responses.push_back({&key, zookeeper->asyncExists(path)});
}
for (auto & [key, response] : exist_responses)
{
if (response.get().error == Coordination::Error::ZOK)
{
requests.push_back(zkutil::makeSetRequest(storage.fullPathForKey(*key), new_values[*key], -1));
}
else
{
requests.push_back(zkutil::makeCreateRequest(storage.fullPathForKey(*key), new_values[*key], zkutil::CreateMode::Persistent));
++new_keys_num;
}
}
if (new_keys_num != 0)
{
auto will_be = current_keys_num + new_keys_num;
if (keys_limit != 0 && will_be > keys_limit)
throw Exception(
ErrorCodes::LIMIT_EXCEEDED,
"Limit would be exceeded by inserting {} new key(s). Limit is {}, while the number of keys would be {}",
new_keys_num,
keys_limit,
will_be);
}
zookeeper->multi(requests);
}
};
template <typename KeyContainer>
class StorageKeeperMapSource : public ISource
{
const StorageKeeperMap & storage;
size_t max_block_size;
using KeyContainerPtr = std::shared_ptr<KeyContainer>;
KeyContainerPtr container;
using KeyContainerIter = typename KeyContainer::const_iterator;
KeyContainerIter it;
KeyContainerIter end;
public:
StorageKeeperMapSource(
const StorageKeeperMap & storage_,
const Block & header,
size_t max_block_size_,
KeyContainerPtr container_,
KeyContainerIter begin_,
KeyContainerIter end_)
: ISource(header), storage(storage_), max_block_size(max_block_size_), container(std::move(container_)), it(begin_), end(end_)
{
}
std::string getName() const override { return "StorageKeeperMapSource"; }
Chunk generate() override
{
if (it >= end)
{
it = {};
return {};
}
using KeyType = typename KeyContainer::value_type;
if constexpr (std::same_as<KeyType, Field>)
{
const auto & sample_block = getPort().getHeader();
const auto & key_column_type = sample_block.getByName(storage.getPrimaryKey().at(0)).type;
auto raw_keys = serializeKeysToRawString(it, end, key_column_type, max_block_size);
for (auto & raw_key : raw_keys)
raw_key = base64Encode(raw_key, /* url_encoding */ true);
return storage.getBySerializedKeys(raw_keys, nullptr);
}
else
{
size_t elem_num = std::min(max_block_size, static_cast<size_t>(end - it));
auto chunk = storage.getBySerializedKeys(std::span{it, it + elem_num}, nullptr);
it += elem_num;
return chunk;
}
}
};
StorageKeeperMap::StorageKeeperMap(
ContextPtr context_,
const StorageID & table_id,
const StorageInMemoryMetadata & metadata,
bool attach,
std::string_view primary_key_,
const std::string & root_path_,
UInt64 keys_limit_)
: IStorage(table_id)
, WithContext(context_->getGlobalContext())
, root_path(zkutil::extractZooKeeperPath(root_path_, false))
, primary_key(primary_key_)
, zookeeper_name(zkutil::extractZooKeeperName(root_path_))
, keys_limit(keys_limit_)
, log(&Poco::Logger::get(fmt::format("StorageKeeperMap ({})", table_id.getNameForLogs())))
{
std::string path_prefix = context_->getConfigRef().getString("keeper_map_path_prefix", "");
if (path_prefix.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "KeeperMap is disabled because 'keeper_map_path_prefix' config is not defined");
verifyTableId(table_id);
setInMemoryMetadata(metadata);
WriteBufferFromOwnString out;
out << "KeeperMap metadata format version: 1\n"
<< "columns: " << metadata.columns.toString()
<< "primary key: " << formattedAST(metadata.getPrimaryKey().expression_list_ast) << "\n";
metadata_string = out.str();
if (root_path.empty())
throw Exception("root_path should not be empty", ErrorCodes::BAD_ARGUMENTS);
if (!root_path.starts_with('/'))
throw Exception("root_path should start with '/'", ErrorCodes::BAD_ARGUMENTS);
auto config_keys_limit = context_->getConfigRef().getUInt64("keeper_map_keys_limit", 0);
if (config_keys_limit != 0 && (keys_limit == 0 || keys_limit > config_keys_limit))
{
LOG_WARNING(
log,
"Keys limit defined by argument ({}) is larger than the one defined by 'keeper_map_keys_limit' config ({}). Will use "
"config defined value",
keys_limit,
config_keys_limit);
keys_limit = config_keys_limit;
}
else if (keys_limit > 0)
{
LOG_INFO(log, "Keys limit will be set to {}", keys_limit);
}
auto root_path_fs = fs::path(path_prefix) / std::string_view{root_path}.substr(1);
root_path = root_path_fs.generic_string();
data_path = root_path_fs / "data";
auto metadata_path_fs = root_path_fs / "metadata";
metadata_path = metadata_path_fs;
tables_path = metadata_path_fs / "tables";
auto table_unique_id = toString(table_id.uuid) + toString(ServerUUID::get());
table_path = fs::path(tables_path) / table_unique_id;
dropped_path = metadata_path_fs / "dropped";
dropped_lock_path = fs::path(dropped_path) / "lock";
if (attach)
{
checkTable<false>();
return;
}
auto client = getClient();
if (root_path != "/" && !client->exists(root_path))
{
LOG_TRACE(log, "Creating root path {}", root_path);
client->createAncestors(root_path);
client->createIfNotExists(root_path, "");
}
for (size_t i = 0; i < 1000; ++i)
{
if (client->exists(dropped_path))
{
LOG_INFO(log, "Removing leftover nodes");
auto code = client->tryCreate(dropped_lock_path, "", zkutil::CreateMode::Ephemeral);
if (code == Coordination::Error::ZNONODE)
{
LOG_INFO(log, "Someone else removed leftover nodes");
}
else if (code == Coordination::Error::ZNODEEXISTS)
{
LOG_INFO(log, "Someone else is removing leftover nodes");
continue;
}
else if (code != Coordination::Error::ZOK)
{
throw Coordination::Exception(code, dropped_lock_path);
}
else
{
auto metadata_drop_lock = zkutil::EphemeralNodeHolder::existing(dropped_lock_path, *client);
if (!dropTable(client, metadata_drop_lock))
continue;
}
}
std::string stored_metadata_string;
auto exists = client->tryGet(metadata_path, stored_metadata_string);
if (exists)
{
// this requires same name for columns
// maybe we can do a smarter comparison for columns and primary key expression
if (stored_metadata_string != metadata_string)
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Path {} is already used but the stored table definition doesn't match. Stored metadata: {}",
root_path,
stored_metadata_string);
}
else
{
auto code = client->tryCreate(metadata_path, metadata_string, zkutil::CreateMode::Persistent);
if (code == Coordination::Error::ZNODEEXISTS)
continue;
else if (code != Coordination::Error::ZOK)
throw Coordination::Exception(code, metadata_path);
}
client->createIfNotExists(tables_path, "");
auto code = client->tryCreate(table_path, "", zkutil::CreateMode::Persistent);
if (code == Coordination::Error::ZOK)
{
// metadata now should be guaranteed to exist because we added our UUID to the tables_path
client->createIfNotExists(data_path, "");
table_is_valid = true;
return;
}
if (code == Coordination::Error::ZNONODE)
LOG_INFO(log, "Metadata nodes were deleted in background, will retry");
else
throw Coordination::Exception(code, table_path);
}
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot create metadata for table, because it is removed concurrently or because of wrong root_path ({})", root_path);
}
Pipe StorageKeeperMap::read(
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context_,
QueryProcessingStage::Enum /*processed_stage*/,
size_t max_block_size,
unsigned num_streams)
{
checkTable<true>();
storage_snapshot->check(column_names);
FieldVectorPtr filtered_keys;
bool all_scan;
Block sample_block = storage_snapshot->metadata->getSampleBlock();
auto primary_key_type = sample_block.getByName(primary_key).type;
std::tie(filtered_keys, all_scan) = getFilterKeys(primary_key, primary_key_type, query_info, context_);
const auto process_keys = [&]<typename KeyContainerPtr>(KeyContainerPtr keys) -> Pipe
{
if (keys->empty())
return {};
::sort(keys->begin(), keys->end());
keys->erase(std::unique(keys->begin(), keys->end()), keys->end());
Pipes pipes;
size_t num_keys = keys->size();
size_t num_threads = std::min<size_t>(num_streams, keys->size());
assert(num_keys <= std::numeric_limits<uint32_t>::max());
assert(num_threads <= std::numeric_limits<uint32_t>::max());
for (size_t thread_idx = 0; thread_idx < num_threads; ++thread_idx)
{
size_t begin = num_keys * thread_idx / num_threads;
size_t end = num_keys * (thread_idx + 1) / num_threads;
using KeyContainer = typename KeyContainerPtr::element_type;
pipes.emplace_back(std::make_shared<StorageKeeperMapSource<KeyContainer>>(
*this, sample_block, max_block_size, keys, keys->begin() + begin, keys->begin() + end));
}
return Pipe::unitePipes(std::move(pipes));
};
auto client = getClient();
if (all_scan)
return process_keys(std::make_shared<std::vector<std::string>>(client->getChildren(data_path)));
return process_keys(std::move(filtered_keys));
}
SinkToStoragePtr StorageKeeperMap::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr /*context*/)
{
checkTable<true>();
return std::make_shared<StorageKeeperMapSink>(*this, metadata_snapshot);
}
void StorageKeeperMap::truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr, TableExclusiveLockHolder &)
{
checkTable<true>();
auto client = getClient();
client->tryRemoveChildrenRecursive(data_path, true);
}
bool StorageKeeperMap::dropTable(zkutil::ZooKeeperPtr zookeeper, const zkutil::EphemeralNodeHolder::Ptr & metadata_drop_lock)
{
zookeeper->removeChildrenRecursive(data_path);
bool completely_removed = false;
Coordination::Requests ops;
ops.emplace_back(zkutil::makeRemoveRequest(metadata_drop_lock->getPath(), -1));
ops.emplace_back(zkutil::makeRemoveRequest(dropped_path, -1));
ops.emplace_back(zkutil::makeRemoveRequest(data_path, -1));
ops.emplace_back(zkutil::makeRemoveRequest(metadata_path, -1));
Coordination::Responses responses;
auto code = zookeeper->tryMulti(ops, responses);
using enum Coordination::Error;
switch (code)
{
case ZOK:
{
metadata_drop_lock->setAlreadyRemoved();
completely_removed = true;
LOG_INFO(log, "Metadata ({}) and data ({}) was successfully removed from ZooKeeper", metadata_path, data_path);
break;
}
case ZNONODE:
throw Exception(ErrorCodes::LOGICAL_ERROR, "There is a race condition between creation and removal of metadata. It's a bug");
case ZNOTEMPTY:
LOG_ERROR(log, "Metadata was not completely removed from ZooKeeper");
break;
default:
zkutil::KeeperMultiException::check(code, ops, responses);
break;
}
return completely_removed;
}
void StorageKeeperMap::drop()
{
checkTable<true>();
auto client = getClient();
client->remove(table_path);
if (!client->getChildren(tables_path).empty())
return;
Coordination::Requests ops;
Coordination::Responses responses;
ops.emplace_back(zkutil::makeRemoveRequest(tables_path, -1));
ops.emplace_back(zkutil::makeCreateRequest(dropped_path, "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(dropped_lock_path, "", zkutil::CreateMode::Ephemeral));
auto code = client->tryMulti(ops, responses);
if (code == Coordination::Error::ZNONODE || code == Coordination::Error::ZNODEEXISTS)
{
LOG_INFO(log, "Metadata is being removed by another table");
return;
}
else if (code == Coordination::Error::ZNOTEMPTY)
{
LOG_WARNING(log, "Another table is using the same path, metadata will not be deleted");
return;
}
else if (code != Coordination::Error::ZOK)
zkutil::KeeperMultiException::check(code, ops, responses);
auto metadata_drop_lock = zkutil::EphemeralNodeHolder::existing(dropped_lock_path, *client);
dropTable(client, metadata_drop_lock);
}
zkutil::ZooKeeperPtr StorageKeeperMap::getClient() const
{
std::lock_guard lock{zookeeper_mutex};
if (!zookeeper_client || zookeeper_client->expired())
{
zookeeper_client = nullptr;
if (zookeeper_name == "default")
zookeeper_client = getContext()->getZooKeeper();
else
zookeeper_client = getContext()->getAuxiliaryZooKeeper(zookeeper_name);
zookeeper_client->sync(root_path);
}
return zookeeper_client;
}
const std::string & StorageKeeperMap::dataPath() const
{
return data_path;
}
std::string StorageKeeperMap::fullPathForKey(const std::string_view key) const
{
return fs::path(data_path) / key;
}
UInt64 StorageKeeperMap::keysLimit() const
{
return keys_limit;
}
std::optional<bool> StorageKeeperMap::isTableValid() const
{
std::lock_guard lock{init_mutex};
if (table_is_valid.has_value())
return *table_is_valid;
[&]
{
try
{
auto client = getClient();
std::string stored_metadata_string;
Coordination::Stat metadata_stat;
client->tryGet(metadata_path, stored_metadata_string, &metadata_stat);
if (metadata_stat.numChildren == 0)
{
table_is_valid = false;
return;
}
if (metadata_string != stored_metadata_string)
{
LOG_ERROR(
log,
"Table definition does not match to the one stored in the path {}. Stored definition: {}",
root_path,
stored_metadata_string);
table_is_valid = false;
return;
}
// validate all metadata and data nodes are present
Coordination::Requests requests;
requests.push_back(zkutil::makeCheckRequest(table_path, -1));
requests.push_back(zkutil::makeCheckRequest(data_path, -1));
requests.push_back(zkutil::makeCheckRequest(dropped_path, -1));
Coordination::Responses responses;
client->tryMulti(requests, responses);
table_is_valid = false;
if (responses[0]->error != Coordination::Error::ZOK)
{
LOG_ERROR(log, "Table node ({}) is missing", table_path);
return;
}
if (responses[1]->error != Coordination::Error::ZOK)
{
LOG_ERROR(log, "Data node ({}) is missing", data_path);
return;
}
if (responses[2]->error == Coordination::Error::ZOK)
{
LOG_ERROR(log, "Tables with root node {} are being dropped", root_path);
return;
}
table_is_valid = true;
}
catch (const Coordination::Exception & e)
{
tryLogCurrentException(log);
if (!Coordination::isHardwareError(e.code))
table_is_valid = false;
}
}();
return table_is_valid;
}
Chunk StorageKeeperMap::getByKeys(const ColumnsWithTypeAndName & keys, PaddedPODArray<UInt8> & null_map, const Names &) const
{
if (keys.size() != 1)
throw Exception(ErrorCodes::LOGICAL_ERROR, "StorageKeeperMap supports only one key, got: {}", keys.size());
auto raw_keys = serializeKeysToRawString(keys[0]);
if (raw_keys.size() != keys[0].column->size())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Assertion failed: {} != {}", raw_keys.size(), keys[0].column->size());
return getBySerializedKeys(raw_keys, &null_map);
}
Chunk StorageKeeperMap::getBySerializedKeys(const std::span<const std::string> keys, PaddedPODArray<UInt8> * null_map) const
{
Block sample_block = getInMemoryMetadataPtr()->getSampleBlock();
MutableColumns columns = sample_block.cloneEmptyColumns();
size_t primary_key_pos = getPrimaryKeyPos(sample_block, getPrimaryKey());
if (null_map)
{
null_map->clear();
null_map->resize_fill(keys.size(), 1);
}
auto client = getClient();
std::vector<std::future<Coordination::GetResponse>> values;
values.reserve(keys.size());
for (const auto & key : keys)
{
const auto full_path = fullPathForKey(key);
values.emplace_back(client->asyncTryGet(full_path));
}
auto wait_until = std::chrono::system_clock::now() + std::chrono::milliseconds(Coordination::DEFAULT_OPERATION_TIMEOUT_MS);
for (size_t i = 0; i < keys.size(); ++i)
{
auto & value = values[i];
if (value.wait_until(wait_until) != std::future_status::ready)
throw DB::Exception(ErrorCodes::KEEPER_EXCEPTION, "Failed to fetch values: timeout");
auto response = value.get();
Coordination::Error code = response.error;
if (code == Coordination::Error::ZOK)
{
fillColumns(base64Decode(keys[i], true), response.data, primary_key_pos, sample_block, columns);
}
else if (code == Coordination::Error::ZNONODE)
{
if (null_map)
{
(*null_map)[i] = 0;
for (size_t col_idx = 0; col_idx < sample_block.columns(); ++col_idx)
columns[col_idx]->insert(sample_block.getByPosition(col_idx).type->getDefault());
}
}
else
{
throw DB::Exception(ErrorCodes::KEEPER_EXCEPTION, "Failed to fetch value: {}", code);
}
}
size_t num_rows = columns.at(0)->size();
return Chunk(std::move(columns), num_rows);
}
Block StorageKeeperMap::getSampleBlock(const Names &) const
{
auto metadata = getInMemoryMetadataPtr();
return metadata->getSampleBlock();
}
void StorageKeeperMap::checkTableCanBeRenamed(const StorageID & new_name) const
{
verifyTableId(new_name);
}
void StorageKeeperMap::rename(const String & /*new_path_to_table_data*/, const StorageID & new_table_id)
{
checkTableCanBeRenamed(new_table_id);
renameInMemory(new_table_id);
}
namespace
{
StoragePtr create(const StorageFactory::Arguments & args)
{
ASTs & engine_args = args.engine_args;
if (engine_args.empty() || engine_args.size() > 2)
throw Exception(
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Storage KeeperMap requires 1-3 arguments:\n"
"root_path: path in the Keeper where the values will be stored (required)\n"
"keys_limit: number of keys allowed to be stored, 0 is no limit (default: 0)");
const auto root_path_node = evaluateConstantExpressionAsLiteral(engine_args[0], args.getLocalContext());
auto root_path = checkAndGetLiteralArgument<std::string>(root_path_node, "root_path");
UInt64 keys_limit = 0;
if (engine_args.size() > 1)
keys_limit = checkAndGetLiteralArgument<UInt64>(engine_args[1], "keys_limit");
StorageInMemoryMetadata metadata;
metadata.setColumns(args.columns);
metadata.setConstraints(args.constraints);
if (!args.storage_def->primary_key)
throw Exception("StorageKeeperMap requires one column in primary key", ErrorCodes::BAD_ARGUMENTS);
metadata.primary_key = KeyDescription::getKeyFromAST(args.storage_def->primary_key->ptr(), metadata.columns, args.getContext());
auto primary_key_names = metadata.getColumnsRequiredForPrimaryKey();
if (primary_key_names.size() != 1)
throw Exception("StorageKeeperMap requires one column in primary key", ErrorCodes::BAD_ARGUMENTS);
return std::make_shared<StorageKeeperMap>(
args.getContext(), args.table_id, metadata, args.query.attach, primary_key_names[0], root_path, keys_limit);
}
}
void registerStorageKeeperMap(StorageFactory & factory)
{
factory.registerStorage(
"KeeperMap",
create,
{
.supports_sort_order = true,
.supports_parallel_insert = true,
});
}
}

View File

@ -0,0 +1,138 @@
#pragma once
#include <Interpreters/Context.h>
#include <Interpreters/IKeyValueEntity.h>
#include <QueryPipeline/Pipe.h>
#include <Storages/IStorage.h>
#include <Storages/StorageInMemoryMetadata.h>
#include <Common/PODArray_fwd.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <span>
namespace DB
{
namespace ErrorCodes
{
extern const int INVALID_STATE;
}
// KV store using (Zoo|CH)Keeper
class StorageKeeperMap final : public IStorage, public IKeyValueEntity, WithContext
{
public:
StorageKeeperMap(
ContextPtr context_,
const StorageID & table_id,
const StorageInMemoryMetadata & metadata,
bool attach,
std::string_view primary_key_,
const std::string & root_path_,
UInt64 keys_limit_);
Pipe read(
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
unsigned num_streams) override;
SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr context) override;
void truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr, TableExclusiveLockHolder &) override;
void drop() override;
std::string getName() const override { return "KeeperMap"; }
Names getPrimaryKey() const override { return {primary_key}; }
Chunk getByKeys(const ColumnsWithTypeAndName & keys, PaddedPODArray<UInt8> & null_map, const Names &) const override;
Chunk getBySerializedKeys(std::span<const std::string> keys, PaddedPODArray<UInt8> * null_map) const;
Block getSampleBlock(const Names &) const override;
void checkTableCanBeRenamed(const StorageID & new_name) const override;
void rename(const String & new_path_to_table_data, const StorageID & new_table_id) override;
bool supportsParallelInsert() const override { return true; }
bool supportsIndexForIn() const override { return true; }
bool mayBenefitFromIndexForIn(
const ASTPtr & node, ContextPtr /*query_context*/, const StorageMetadataPtr & /*metadata_snapshot*/) const override
{
return node->getColumnName() == primary_key;
}
zkutil::ZooKeeperPtr getClient() const;
const std::string & dataPath() const;
std::string fullPathForKey(std::string_view key) const;
UInt64 keysLimit() const;
template <bool throw_on_error>
void checkTable() const
{
auto is_table_valid = isTableValid();
if (!is_table_valid.has_value())
{
static constexpr std::string_view error_msg = "Failed to activate table because of connection issues. It will be activated "
"once a connection is established and metadata is verified";
if constexpr (throw_on_error)
throw Exception(ErrorCodes::INVALID_STATE, error_msg);
else
{
LOG_ERROR(log, fmt::runtime(error_msg));
return;
}
}
if (!*is_table_valid)
{
static constexpr std::string_view error_msg
= "Failed to activate table because of invalid metadata in ZooKeeper. Please DETACH table";
if constexpr (throw_on_error)
throw Exception(ErrorCodes::INVALID_STATE, error_msg);
else
{
LOG_ERROR(log, fmt::runtime(error_msg));
return;
}
}
}
private:
bool dropTable(zkutil::ZooKeeperPtr zookeeper, const zkutil::EphemeralNodeHolder::Ptr & metadata_drop_lock);
std::optional<bool> isTableValid() const;
std::string root_path;
std::string primary_key;
std::string data_path;
std::string metadata_path;
std::string tables_path;
std::string table_path;
std::string dropped_path;
std::string dropped_lock_path;
std::string zookeeper_name;
std::string metadata_string;
uint64_t keys_limit{0};
mutable std::mutex zookeeper_mutex;
mutable zkutil::ZooKeeperPtr zookeeper_client{nullptr};
mutable std::mutex init_mutex;
mutable std::optional<bool> table_is_valid;
Poco::Logger * log;
};
}

View File

@ -3649,7 +3649,7 @@ void StorageReplicatedMergeTree::updateQuorum(const String & part_name, bool is_
if (quorum_entry.replicas.size() >= quorum_entry.required_number_of_replicas)
{
/// The quorum is reached. Delete the node, and update information about the last part that was successfully written with quorum.
LOG_TRACE(log, "Got {} (of {}) replicas confirmed quorum {}, going to remove node",
LOG_TRACE(log, "Got {} (of {} required) replicas confirmed quorum {}, going to remove node",
quorum_entry.replicas.size(), quorum_entry.required_number_of_replicas, quorum_status_path);
Coordination::Requests ops;
@ -8222,25 +8222,12 @@ void StorageReplicatedMergeTree::createZeroCopyLockNode(
}
}
bool StorageReplicatedMergeTree::removeDetachedPart(DiskPtr disk, const String & path, const String & part_name, bool is_freezed)
bool StorageReplicatedMergeTree::removeDetachedPart(DiskPtr disk, const String & path, const String & part_name)
{
if (disk->supportZeroCopyReplication())
{
if (is_freezed)
{
FreezeMetaData meta;
if (meta.load(disk, path))
{
FreezeMetaData::clean(disk, path);
return removeSharedDetachedPart(disk, path, part_name, meta.table_shared_id, meta.zookeeper_name, meta.replica_name, "", getContext());
}
}
else
{
String table_id = getTableSharedID();
return removeSharedDetachedPart(disk, path, part_name, table_id, zookeeper_name, replica_name, zookeeper_path, getContext());
}
String table_id = getTableSharedID();
return removeSharedDetachedPart(disk, path, part_name, table_id, zookeeper_name, replica_name, zookeeper_path, getContext(), current_zookeeper);
}
disk->removeRecursive(path);
@ -8250,11 +8237,10 @@ bool StorageReplicatedMergeTree::removeDetachedPart(DiskPtr disk, const String &
bool StorageReplicatedMergeTree::removeSharedDetachedPart(DiskPtr disk, const String & path, const String & part_name, const String & table_uuid,
const String &, const String & detached_replica_name, const String & detached_zookeeper_path, ContextPtr local_context)
const String &, const String & detached_replica_name, const String & detached_zookeeper_path, ContextPtr local_context, const zkutil::ZooKeeperPtr & zookeeper)
{
bool keep_shared = false;
zkutil::ZooKeeperPtr zookeeper = local_context->getZooKeeper();
NameSet files_not_to_remove;
fs::path checksums = fs::path(path) / IMergeTreeDataPart::FILE_FOR_REFERENCES_CHECK;

View File

@ -325,7 +325,7 @@ public:
void checkBrokenDisks();
static bool removeSharedDetachedPart(DiskPtr disk, const String & path, const String & part_name, const String & table_uuid,
const String & zookeeper_name, const String & replica_name, const String & zookeeper_path, ContextPtr local_context);
const String & zookeeper_name, const String & replica_name, const String & zookeeper_path, ContextPtr local_context, const zkutil::ZooKeeperPtr & zookeeper);
bool canUseZeroCopyReplication() const;
private:
@ -834,7 +834,7 @@ private:
int32_t mode = zkutil::CreateMode::Persistent, bool replace_existing_lock = false,
const String & path_to_set_hardlinked_files = "", const NameSet & hardlinked_files = {});
bool removeDetachedPart(DiskPtr disk, const String & path, const String & part_name, bool is_freezed) override;
bool removeDetachedPart(DiskPtr disk, const String & path, const String & part_name) override;
/// Create freeze metadata for table and save in zookeeper. Required only if zero-copy replication enabled.
void createAndStoreFreezeMetadata(DiskPtr disk, DataPartPtr part, String backup_part_path) const override;

View File

@ -88,6 +88,7 @@ void registerStorageFileLog(StorageFactory & factory);
void registerStorageSQLite(StorageFactory & factory);
#endif
void registerStorageKeeperMap(StorageFactory & factory);
void registerStorages()
{
@ -171,6 +172,8 @@ void registerStorages()
#if USE_SQLITE
registerStorageSQLite(factory);
#endif
registerStorageKeeperMap(factory);
}
}

View File

@ -0,0 +1,3 @@
<clickhouse>
<keeper_map_path_prefix>/test_keeper_map</keeper_map_path_prefix>
</clickhouse>

View File

@ -0,0 +1,4 @@
<?xml version="1.0"?>
<clickhouse>
<enable_system_unfreeze>true</enable_system_unfreeze>
</clickhouse>

View File

@ -47,8 +47,10 @@ ln -sf $SRC_PATH/config.d/named_collection.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/ssl_certs.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/filesystem_cache_log.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/session_log.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/system_unfreeze.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/enable_zero_copy_replication.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/nlp.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/enable_keeper_map.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/users.d/log_queries.xml $DEST_SERVER_PATH/users.d/
ln -sf $SRC_PATH/users.d/readonly.xml $DEST_SERVER_PATH/users.d/
@ -61,6 +63,7 @@ ln -sf $SRC_PATH/users.d/memory_profiler.xml $DEST_SERVER_PATH/users.d/
ln -sf $SRC_PATH/users.d/no_fsync_metadata.xml $DEST_SERVER_PATH/users.d/
ln -sf $SRC_PATH/users.d/filelog.xml $DEST_SERVER_PATH/users.d/
ln -sf $SRC_PATH/users.d/enable_blobs_check.xml $DEST_SERVER_PATH/users.d/
ln -sf $SRC_PATH/users.d/marks.xml $DEST_SERVER_PATH/users.d/
# FIXME DataPartsExchange may hang for http_send_timeout seconds
# when nobody is going to read from the other side of socket (due to "Fetching of part was cancelled"),

View File

@ -0,0 +1,7 @@
<clickhouse>
<profiles>
<default>
<load_marks_asynchronously>1</load_marks_asynchronously>
</default>
</profiles>
</clickhouse>

View File

@ -0,0 +1 @@
#!/usr/bin/env python3

View File

@ -0,0 +1,3 @@
<clickhouse>
<keeper_map_path_prefix>/test_keeper_map</keeper_map_path_prefix>
</clickhouse>

View File

@ -0,0 +1,179 @@
import multiprocessing
import pytest
from time import sleep
import random
from itertools import count
from sys import stdout
from multiprocessing import Pool
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import assert_eq_with_retry, assert_logs_contain
from helpers.network import PartitionManager
test_recover_staled_replica_run = 1
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance(
"node",
main_configs=["configs/enable_keeper_map.xml"],
with_zookeeper=True,
stay_alive=True,
)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def get_genuine_zk():
return cluster.get_kazoo_client("zoo1")
def remove_children(client, path):
children = client.get_children(path)
for child in children:
child_path = f"{path}/{child}"
remove_children(client, child_path)
client.delete(child_path)
def test_create_keeper_map(started_cluster):
node.query(
"CREATE TABLE test_keeper_map (key UInt64, value UInt64) ENGINE = KeeperMap('/test1') PRIMARY KEY(key);"
)
zk_client = get_genuine_zk()
def assert_children_size(path, expected_size):
assert len(zk_client.get_children(path)) == expected_size
def assert_root_children_size(expected_size):
assert_children_size("/test_keeper_map/test1", expected_size)
def assert_data_children_size(expected_size):
assert_children_size("/test_keeper_map/test1/data", expected_size)
assert_root_children_size(2)
assert_data_children_size(0)
node.query("INSERT INTO test_keeper_map VALUES (1, 11)")
assert_data_children_size(1)
node.query(
"CREATE TABLE test_keeper_map_another (key UInt64, value UInt64) ENGINE = KeeperMap('/test1') PRIMARY KEY(key);"
)
assert_root_children_size(2)
assert_data_children_size(1)
node.query("INSERT INTO test_keeper_map_another VALUES (1, 11)")
assert_root_children_size(2)
assert_data_children_size(1)
node.query("INSERT INTO test_keeper_map_another VALUES (2, 22)")
assert_root_children_size(2)
assert_data_children_size(2)
node.query("DROP TABLE test_keeper_map SYNC")
assert_root_children_size(2)
assert_data_children_size(2)
node.query("DROP TABLE test_keeper_map_another SYNC")
assert_root_children_size(0)
zk_client.stop()
def create_drop_loop(index, stop_event):
table_name = f"test_keeper_map_{index}"
for i in count(0, 1):
if stop_event.is_set():
return
node.query(
f"CREATE TABLE {table_name} (key UInt64, value UInt64) ENGINE = KeeperMap('/test') PRIMARY KEY(key);"
)
node.query(f"INSERT INTO {table_name} VALUES ({index}, {i})")
result = node.query(f"SELECT value FROM {table_name} WHERE key = {index}")
assert result.strip() == str(i)
node.query(f"DROP TABLE {table_name} SYNC")
def test_create_drop_keeper_map_concurrent(started_cluster):
pool = Pool()
manager = multiprocessing.Manager()
stop_event = manager.Event()
results = []
for i in range(multiprocessing.cpu_count()):
sleep(0.2)
results.append(
pool.apply_async(
create_drop_loop,
args=(
i,
stop_event,
),
)
)
sleep(60)
stop_event.set()
for result in results:
result.get()
pool.close()
client = get_genuine_zk()
assert len(client.get_children("/test_keeper_map/test")) == 0
client.stop()
def test_keeper_map_without_zk(started_cluster):
def assert_keeper_exception_after_partition(query):
with PartitionManager() as pm:
pm.drop_instance_zk_connections(node)
error = node.query_and_get_error(query)
assert "Coordination::Exception" in error
assert_keeper_exception_after_partition(
"CREATE TABLE test_keeper_map (key UInt64, value UInt64) ENGINE = KeeperMap('/test1') PRIMARY KEY(key);"
)
node.query(
"CREATE TABLE test_keeper_map (key UInt64, value UInt64) ENGINE = KeeperMap('/test1') PRIMARY KEY(key);"
)
assert_keeper_exception_after_partition(
"INSERT INTO test_keeper_map VALUES (1, 11)"
)
node.query("INSERT INTO test_keeper_map VALUES (1, 11)")
assert_keeper_exception_after_partition("SELECT * FROM test_keeper_map")
node.query("SELECT * FROM test_keeper_map")
with PartitionManager() as pm:
pm.drop_instance_zk_connections(node)
node.restart_clickhouse(60)
error = node.query_and_get_error("SELECT * FROM test_keeper_map")
assert "Failed to activate table because of connection issues" in error
node.query("SELECT * FROM test_keeper_map")
client = get_genuine_zk()
remove_children(client, "/test_keeper_map/test1")
node.restart_clickhouse(60)
error = node.query_and_get_error("SELECT * FROM test_keeper_map")
assert "Failed to activate table because of invalid metadata in ZooKeeper" in error
node.query("DETACH TABLE test_keeper_map")
client.stop()

View File

@ -1,4 +1,4 @@
<yandex>
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>1</server_id>
@ -19,9 +19,19 @@
<id>1</id>
<hostname>node1</hostname>
<port>9234</port>
<can_become_leader>true</can_become_leader>
<priority>3</priority>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
</raft_configuration>
</keeper_server>
</yandex>
</clickhouse>

View File

@ -0,0 +1,37 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>2</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<four_letter_word_white_list>*</four_letter_word_white_list>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<min_session_timeout_ms>5000</min_session_timeout_ms>
<snapshot_distance>75</snapshot_distance>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>9234</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,37 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>3</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<four_letter_word_white_list>*</four_letter_word_white_list>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<min_session_timeout_ms>5000</min_session_timeout_ms>
<snapshot_distance>75</snapshot_distance>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>9234</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -10,7 +10,15 @@ from kazoo.client import KazooClient
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance(
"node1", main_configs=["configs/keeper_config.xml"], stay_alive=True
"node1", main_configs=["configs/keeper_config1.xml"], stay_alive=True
)
node2 = cluster.add_instance(
"node2", main_configs=["configs/keeper_config2.xml"], stay_alive=True
)
node3 = cluster.add_instance(
"node3", main_configs=["configs/keeper_config3.xml"], stay_alive=True
)
bool_struct = struct.Struct("B")
@ -61,7 +69,7 @@ def wait_node(node):
def wait_nodes():
for n in [node1]:
for n in [node1, node2, node3]:
wait_node(n)
@ -165,3 +173,21 @@ def test_session_timeout(started_cluster):
negotiated_timeout, _ = handshake(node1.name, session_timeout=20000, session_id=0)
assert negotiated_timeout == 10000
def test_session_close_shutdown(started_cluster):
wait_nodes()
node1_zk = get_fake_zk(node1.name)
node2_zk = get_fake_zk(node2.name)
eph_node = "/test_node"
node2_zk.create(eph_node, ephemeral=True)
assert node1_zk.exists(eph_node) != None
# shutdown while session is active
node2.stop_clickhouse()
assert node1_zk.exists(eph_node) == None
node2.start_clickhouse()

View File

@ -1,18 +1,3 @@
<clickhouse>
<tcp_port>9000</tcp_port>
<listen_host>127.0.0.1</listen_host>
<openSSL>
<client>
<cacheSessions>true</cacheSessions>
<verificationMode>none</verificationMode>
<invalidCertificateHandler>
<name>AcceptCertificateHandler</name>
</invalidCertificateHandler>
</client>
</openSSL>
<max_concurrent_queries>500</max_concurrent_queries>
<path>./clickhouse/</path>
<users_config>users.xml</users_config>
<enable_system_unfreeze>true</enable_system_unfreeze>
</clickhouse>

View File

@ -17,6 +17,7 @@ def cluster():
cluster.add_instance(
"node",
main_configs=[
"configs/config.xml",
"configs/config.d/storage_conf.xml",
"configs/config.d/bg_processing_pool_conf.xml",
],
@ -531,6 +532,8 @@ def test_freeze_unfreeze(cluster, node_name):
# Unfreeze all partitions from backup2.
node.query("ALTER TABLE s3_test UNFREEZE WITH NAME 'backup2'")
wait_for_delete_s3_objects(cluster, FILES_OVERHEAD)
# Data should be removed from S3.
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/"))) == FILES_OVERHEAD
@ -563,6 +566,8 @@ def test_freeze_system_unfreeze(cluster, node_name):
# Unfreeze all data from backup3.
node.query("SYSTEM UNFREEZE WITH NAME 'backup3'")
wait_for_delete_s3_objects(cluster, FILES_OVERHEAD)
# Data should be removed from S3.
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/"))) == FILES_OVERHEAD

View File

@ -1,5 +1,5 @@
<clickhouse>
<enable_system_unfreeze>true</enable_system_unfreeze>
<storage_configuration>
<disks>
<s31>

View File

@ -2,7 +2,7 @@
SET prefer_localhost_replica = 1;
SELECT count() FROM remote('127.0.0.1,localhos', system.one); -- { serverError 279 }
SELECT count() FROM remote('127.0.0.1,localhos', system.one); -- { serverError 198 }
SELECT count() FROM remote('127.0.0.1|localhos', system.one);
-- Clear cache to avoid future errors in the logs

View File

@ -5,7 +5,7 @@ SELECT count() > 0 FROM system.stack_trace WHERE query_id != '';
SELECT countIf(thread_id > 0) > 0 FROM system.stack_trace;
1
-- optimization for trace
SELECT length(trace) > 0 FROM system.stack_trace LIMIT 1;
SELECT count(trace) > 0 FROM system.stack_trace WHERE length(trace) > 0 LIMIT 1;
1
-- optimization for query_id
SELECT length(query_id) > 0 FROM system.stack_trace WHERE query_id != '' LIMIT 1;

View File

@ -5,7 +5,7 @@ SELECT count() > 0 FROM system.stack_trace WHERE query_id != '';
-- opimization for not reading /proc/self/task/{}/comm and avoid sending signal
SELECT countIf(thread_id > 0) > 0 FROM system.stack_trace;
-- optimization for trace
SELECT length(trace) > 0 FROM system.stack_trace LIMIT 1;
SELECT count(trace) > 0 FROM system.stack_trace WHERE length(trace) > 0 LIMIT 1;
-- optimization for query_id
SELECT length(query_id) > 0 FROM system.stack_trace WHERE query_id != '' LIMIT 1;
-- optimization for thread_name

View File

@ -2,6 +2,7 @@ DROP TABLE IF EXISTS data_01283;
set remote_filesystem_read_method = 'read';
set local_filesystem_read_method = 'pread';
set load_marks_asynchronously = 0;
CREATE TABLE data_01283 engine=MergeTree()
ORDER BY key

View File

@ -2,6 +2,7 @@ drop table if exists table_01323_many_parts;
set remote_filesystem_read_method = 'read';
set local_filesystem_read_method = 'pread';
set load_marks_asynchronously = 0;
create table table_01323_many_parts (x UInt64) engine = MergeTree order by x partition by x % 100;
set max_partitions_per_insert_block = 100;

View File

@ -1,5 +1,5 @@
#!/usr/bin/env bash
# Tags: no-replicated-database, no-parallel, no-ordinary-database
# Tags: no-replicated-database, no-parallel
# Tag no-replicated-database: Unsupported type of ALTER query
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)

View File

@ -33,6 +33,7 @@ OPTIMIZE TABLE select_final FINAL;
SET remote_filesystem_read_method = 'read';
SET local_filesystem_read_method = 'pread';
set load_marks_asynchronously = 0;
SELECT max(x) FROM select_final FINAL;

View File

@ -0,0 +1,21 @@
-- Regression test when Join stores data on disk and receive empty block.
-- Because of this it does not create empty file, while expect it.
SET max_threads = 1;
SET join_algorithm = 'auto';
SET max_rows_in_join = 1000;
SET optimize_aggregation_in_order = 1;
SET max_block_size = 1000;
DROP TABLE IF EXISTS join_on_disk;
SYSTEM STOP MERGES join_on_disk;
CREATE TABLE join_on_disk (id Int) Engine=MergeTree() ORDER BY id;
INSERT INTO join_on_disk SELECT number as id FROM numbers_mt(50000);
INSERT INTO join_on_disk SELECT number as id FROM numbers_mt(1000);
SELECT id FROM join_on_disk lhs LEFT JOIN (SELECT id FROM join_on_disk GROUP BY id) rhs USING (id) FORMAT Null;
DROP TABLE join_on_disk;

View File

@ -30,7 +30,7 @@ EOF
${CLICKHOUSE_CLIENT} -q "SYSTEM STOP MERGES lazy_mark_test"
${CLICKHOUSE_CLIENT} -q "INSERT INTO lazy_mark_test select number, number % 3, number % 5, number % 10, number % 13, number % 15, number % 17, number % 18, number % 22, number % 25 from numbers(1000000)"
${CLICKHOUSE_CLIENT} -q "SYSTEM DROP MARK CACHE"
${CLICKHOUSE_CLIENT} --log_queries=1 --query_id "${QUERY_ID}" -q "SELECT * FROM lazy_mark_test WHERE n3==11"
${CLICKHOUSE_CLIENT} --log_queries=1 --query_id "${QUERY_ID}" -q "SELECT * FROM lazy_mark_test WHERE n3==11 SETTINGS load_marks_asynchronously=0"
${CLICKHOUSE_CLIENT} -q "SYSTEM FLUSH LOGS"
${CLICKHOUSE_CLIENT} -q "select ProfileEvents['FileOpen'] from system.query_log where query_id = '${QUERY_ID}' and type = 'QueryFinish' and current_database = currentDatabase()"

View File

@ -8,7 +8,8 @@ GROUP BY
(number),
(number % 2)
)
ORDER BY number, gr;
ORDER BY number, gr
SETTINGS force_grouping_standard_compatibility=0;
0 1
0 1
0 2
@ -30,7 +31,8 @@ GROUP BY
(number),
(number % 2)
)
ORDER BY number, gr;
ORDER BY number, gr
SETTINGS force_grouping_standard_compatibility=0;
0 1
0 2
0 2
@ -52,7 +54,8 @@ GROUP BY
(number),
(number % 2)
)
ORDER BY number, gr;
ORDER BY number, gr
SETTINGS force_grouping_standard_compatibility=0;
0 0
0 1
0 1
@ -73,7 +76,8 @@ GROUP BY
(number),
(number % 2)
)
ORDER BY number, grouping(number, number % 2) = 1;
ORDER BY number, grouping(number, number % 2) = 1
SETTINGS force_grouping_standard_compatibility=0;
0
0
0
@ -97,7 +101,8 @@ GROUP BY
(number, number % 2),
()
)
ORDER BY (gr, number);
ORDER BY (gr, number)
SETTINGS force_grouping_standard_compatibility=0;
0 10 0
0 1 2
1 1 2
@ -129,7 +134,7 @@ GROUP BY
)
HAVING grouping(number, number % 2) = 2
ORDER BY number
SETTINGS enable_optimize_predicate_expression = 0;
SETTINGS enable_optimize_predicate_expression = 0, force_grouping_standard_compatibility=0;
0
1
2
@ -150,7 +155,7 @@ GROUP BY
)
HAVING grouping(number, number % 2) = 1
ORDER BY number
SETTINGS enable_optimize_predicate_expression = 0;
SETTINGS enable_optimize_predicate_expression = 0, force_grouping_standard_compatibility=0;
0
0
SELECT
@ -161,7 +166,8 @@ GROUP BY
GROUPING SETS (
(number),
(number % 2))
ORDER BY number, gr;
ORDER BY number, gr
SETTINGS force_grouping_standard_compatibility=0;
0 0
0 1
0 1

View File

@ -19,7 +19,8 @@ GROUP BY
(number),
(number % 2)
)
ORDER BY number, gr;
ORDER BY number, gr
SETTINGS force_grouping_standard_compatibility=0;
SELECT
number,
@ -30,7 +31,8 @@ GROUP BY
(number),
(number % 2)
)
ORDER BY number, gr;
ORDER BY number, gr
SETTINGS force_grouping_standard_compatibility=0;
SELECT
number,
@ -41,7 +43,8 @@ GROUP BY
(number),
(number % 2)
)
ORDER BY number, gr;
ORDER BY number, gr
SETTINGS force_grouping_standard_compatibility=0;
SELECT
number
@ -51,7 +54,8 @@ GROUP BY
(number),
(number % 2)
)
ORDER BY number, grouping(number, number % 2) = 1;
ORDER BY number, grouping(number, number % 2) = 1
SETTINGS force_grouping_standard_compatibility=0;
SELECT
number,
@ -64,7 +68,8 @@ GROUP BY
(number, number % 2),
()
)
ORDER BY (gr, number);
ORDER BY (gr, number)
SETTINGS force_grouping_standard_compatibility=0;
SELECT
number
@ -76,7 +81,7 @@ GROUP BY
)
HAVING grouping(number, number % 2) = 2
ORDER BY number
SETTINGS enable_optimize_predicate_expression = 0;
SETTINGS enable_optimize_predicate_expression = 0, force_grouping_standard_compatibility=0;
SELECT
number
@ -88,7 +93,7 @@ GROUP BY
)
HAVING grouping(number, number % 2) = 1
ORDER BY number
SETTINGS enable_optimize_predicate_expression = 0;
SETTINGS enable_optimize_predicate_expression = 0, force_grouping_standard_compatibility=0;
SELECT
number,
@ -98,4 +103,5 @@ GROUP BY
GROUPING SETS (
(number),
(number % 2))
ORDER BY number, gr;
ORDER BY number, gr
SETTINGS force_grouping_standard_compatibility=0;

View File

@ -6,7 +6,8 @@ FROM remote('127.0.0.{2,3}', numbers(10))
GROUP BY
number,
number % 2
ORDER BY number;
ORDER BY number
SETTINGS force_grouping_standard_compatibility=0;
0 1
1 1
2 1
@ -25,7 +26,8 @@ FROM remote('127.0.0.{2,3}', numbers(10))
GROUP BY
number,
number % 2
ORDER BY number;
ORDER BY number
SETTINGS force_grouping_standard_compatibility=0;
0 1 1
1 1 1
2 1 1
@ -45,7 +47,8 @@ GROUP BY
number % 2
WITH ROLLUP
ORDER BY
number, gr;
number, gr
SETTINGS force_grouping_standard_compatibility=0;
0 0
0 2
0 3
@ -74,7 +77,8 @@ FROM remote('127.0.0.{2,3}', numbers(10))
GROUP BY
ROLLUP(number, number % 2)
ORDER BY
number, gr;
number, gr
SETTINGS force_grouping_standard_compatibility=0;
0 0
0 2
0 3
@ -105,7 +109,8 @@ GROUP BY
number % 2
WITH CUBE
ORDER BY
number, gr;
number, gr
SETTINGS force_grouping_standard_compatibility=0;
0 0
0 1
0 1
@ -136,7 +141,8 @@ FROM remote('127.0.0.{2,3}', numbers(10))
GROUP BY
CUBE(number, number % 2)
ORDER BY
number, gr;
number, gr
SETTINGS force_grouping_standard_compatibility=0;
0 0
0 1
0 1
@ -168,7 +174,8 @@ GROUP BY
CUBE(number, number % 2)
HAVING grouping(number) != 0
ORDER BY
number, gr;
number, gr
SETTINGS force_grouping_standard_compatibility=0;
0 5
0 6
1 5
@ -205,7 +212,8 @@ FROM remote('127.0.0.{2,3}', numbers(10))
GROUP BY
CUBE(number, number % 2) WITH TOTALS
ORDER BY
number, gr;
number, gr
SETTINGS force_grouping_standard_compatibility=0;
0 0
0 1
0 1
@ -247,7 +255,8 @@ FROM remote('127.0.0.{2,3}', numbers(10))
GROUP BY
ROLLUP(number, number % 2) WITH TOTALS
ORDER BY
number, gr;
number, gr
SETTINGS force_grouping_standard_compatibility=0;
0 0
0 2
0 3

View File

@ -15,7 +15,8 @@ FROM remote('127.0.0.{2,3}', numbers(10))
GROUP BY
number,
number % 2
ORDER BY number;
ORDER BY number
SETTINGS force_grouping_standard_compatibility=0;
SELECT
number,
@ -25,7 +26,8 @@ FROM remote('127.0.0.{2,3}', numbers(10))
GROUP BY
number,
number % 2
ORDER BY number;
ORDER BY number
SETTINGS force_grouping_standard_compatibility=0;
SELECT
number,
@ -36,7 +38,8 @@ GROUP BY
number % 2
WITH ROLLUP
ORDER BY
number, gr;
number, gr
SETTINGS force_grouping_standard_compatibility=0;
SELECT
number,
@ -45,7 +48,8 @@ FROM remote('127.0.0.{2,3}', numbers(10))
GROUP BY
ROLLUP(number, number % 2)
ORDER BY
number, gr;
number, gr
SETTINGS force_grouping_standard_compatibility=0;
SELECT
number,
@ -56,7 +60,8 @@ GROUP BY
number % 2
WITH CUBE
ORDER BY
number, gr;
number, gr
SETTINGS force_grouping_standard_compatibility=0;
SELECT
number,
@ -65,7 +70,8 @@ FROM remote('127.0.0.{2,3}', numbers(10))
GROUP BY
CUBE(number, number % 2)
ORDER BY
number, gr;
number, gr
SETTINGS force_grouping_standard_compatibility=0;
SELECT
number,
@ -75,7 +81,8 @@ GROUP BY
CUBE(number, number % 2)
HAVING grouping(number) != 0
ORDER BY
number, gr;
number, gr
SETTINGS force_grouping_standard_compatibility=0;
SELECT
number,
@ -94,7 +101,8 @@ FROM remote('127.0.0.{2,3}', numbers(10))
GROUP BY
CUBE(number, number % 2) WITH TOTALS
ORDER BY
number, gr;
number, gr
SETTINGS force_grouping_standard_compatibility=0;
SELECT
number,
@ -113,4 +121,5 @@ FROM remote('127.0.0.{2,3}', numbers(10))
GROUP BY
ROLLUP(number, number % 2) WITH TOTALS
ORDER BY
number, gr;
number, gr
SETTINGS force_grouping_standard_compatibility=0;

View File

@ -1,5 +1,5 @@
-- { echoOn }
SELECT count() AS amount, a, b, GROUPING(a, b) FROM test02315 GROUP BY GROUPING SETS ((a, b), (a), ()) ORDER BY (amount, a, b);
SELECT count() AS amount, a, b, GROUPING(a, b) FROM test02315 GROUP BY GROUPING SETS ((a, b), (a), ()) ORDER BY (amount, a, b) SETTINGS force_grouping_standard_compatibility=0;
1 0 0 3
1 0 2 3
1 0 4 3
@ -13,7 +13,7 @@ SELECT count() AS amount, a, b, GROUPING(a, b) FROM test02315 GROUP BY GROUPING
5 0 0 2
5 1 0 2
10 0 0 0
SELECT count() AS amount, a, b, GROUPING(a, b) FROM test02315 GROUP BY ROLLUP(a, b) ORDER BY (amount, a, b);
SELECT count() AS amount, a, b, GROUPING(a, b) FROM test02315 GROUP BY ROLLUP(a, b) ORDER BY (amount, a, b) SETTINGS force_grouping_standard_compatibility=0;
1 0 0 3
1 0 2 3
1 0 4 3

View File

@ -5,9 +5,9 @@ CREATE TABLE test02315(a UInt64, b UInt64) ENGINE=MergeTree() ORDER BY (a, b);
INSERT INTO test02315 SELECT number % 2 as a, number as b FROM numbers(10);
-- { echoOn }
SELECT count() AS amount, a, b, GROUPING(a, b) FROM test02315 GROUP BY GROUPING SETS ((a, b), (a), ()) ORDER BY (amount, a, b);
SELECT count() AS amount, a, b, GROUPING(a, b) FROM test02315 GROUP BY GROUPING SETS ((a, b), (a), ()) ORDER BY (amount, a, b) SETTINGS force_grouping_standard_compatibility=0;
SELECT count() AS amount, a, b, GROUPING(a, b) FROM test02315 GROUP BY ROLLUP(a, b) ORDER BY (amount, a, b);
SELECT count() AS amount, a, b, GROUPING(a, b) FROM test02315 GROUP BY ROLLUP(a, b) ORDER BY (amount, a, b) SETTINGS force_grouping_standard_compatibility=0;
-- { echoOff }
DROP TABLE test02315;

View File

@ -0,0 +1,29 @@
-- { echoOn }
SELECT count() AS amount, a, b, GROUPING(a, b) FROM test02416 GROUP BY GROUPING SETS ((a, b), (a), ()) ORDER BY (amount, a, b);
1 0 0 0
1 0 2 0
1 0 4 0
1 0 6 0
1 0 8 0
1 1 1 0
1 1 3 0
1 1 5 0
1 1 7 0
1 1 9 0
5 0 0 1
5 1 0 1
10 0 0 3
SELECT count() AS amount, a, b, GROUPING(a, b) FROM test02416 GROUP BY ROLLUP(a, b) ORDER BY (amount, a, b);
1 0 0 0
1 0 2 0
1 0 4 0
1 0 6 0
1 0 8 0
1 1 1 0
1 1 3 0
1 1 5 0
1 1 7 0
1 1 9 0
5 0 0 1
5 1 0 1
10 0 0 3

View File

@ -0,0 +1,14 @@
DROP TABLE IF EXISTS test02416;
CREATE TABLE test02416(a UInt64, b UInt64) ENGINE=MergeTree() ORDER BY (a, b);
INSERT INTO test02416 SELECT number % 2 as a, number as b FROM numbers(10);
-- { echoOn }
SELECT count() AS amount, a, b, GROUPING(a, b) FROM test02416 GROUP BY GROUPING SETS ((a, b), (a), ()) ORDER BY (amount, a, b);
SELECT count() AS amount, a, b, GROUPING(a, b) FROM test02416 GROUP BY ROLLUP(a, b) ORDER BY (amount, a, b);
-- { echoOff }
DROP TABLE test02416;

View File

@ -0,0 +1,6 @@
1
1
1
1
1 1 1 1 1
1

View File

@ -0,0 +1,38 @@
-- Tags: no-ordinary-database, no-fasttest, long
DROP TABLE IF EXISTS 02416_test SYNC;
CREATE TABLE 02416_test (key String, value UInt32) Engine=KeeperMap('/' || currentDatabase() || '/test2416'); -- { serverError 36 }
CREATE TABLE 02416_test (key String, value UInt32) Engine=KeeperMap('/' || currentDatabase() || '/test2416') PRIMARY KEY(key2); -- { serverError 47 }
CREATE TABLE 02416_test (key String, value UInt32) Engine=KeeperMap('/' || currentDatabase() || '/test2416') PRIMARY KEY(key, value); -- { serverError 36 }
CREATE TABLE 02416_test (key String, value UInt32) Engine=KeeperMap('/' || currentDatabase() || '/test2416') PRIMARY KEY(concat(key, value)); -- { serverError 36 }
CREATE TABLE 02416_test (key Tuple(String, UInt32), value UInt64) Engine=KeeperMap('/' || currentDatabase() || '/test2416') PRIMARY KEY(key);
DROP TABLE IF EXISTS 02416_test SYNC;
CREATE TABLE 02416_test (key String, value UInt32) Engine=KeeperMap('/' || currentDatabase() || '/test2416') PRIMARY KEY(key);
INSERT INTO 02416_test SELECT '1_1', number FROM numbers(1000);
SELECT COUNT(1) == 1 FROM 02416_test;
INSERT INTO 02416_test SELECT concat(toString(number), '_1'), number FROM numbers(1000);
SELECT COUNT(1) == 1000 FROM 02416_test;
SELECT uniqExact(key) == 32 FROM (SELECT * FROM 02416_test LIMIT 32 SETTINGS max_block_size = 1);
SELECT SUM(value) == 1 + 99 + 900 FROM 02416_test WHERE key IN ('1_1', '99_1', '900_1');
DROP TABLE IF EXISTS 02416_test SYNC;
DROP TABLE IF EXISTS 02416_test_memory;
CREATE TABLE 02416_test (k UInt32, value UInt64, dummy Tuple(UInt32, Float64), bm AggregateFunction(groupBitmap, UInt64)) Engine=KeeperMap('/' || currentDatabase() || '/test2416') PRIMARY KEY(k);
CREATE TABLE 02416_test_memory AS 02416_test Engine = Memory;
INSERT INTO 02416_test SELECT number % 77 AS k, SUM(number) AS value, (1, 1.2), bitmapBuild(groupArray(number)) FROM numbers(10000) group by k;
INSERT INTO 02416_test_memory SELECT number % 77 AS k, SUM(number) AS value, (1, 1.2), bitmapBuild(groupArray(number)) FROM numbers(10000) group by k;
SELECT A.a = B.a, A.b = B.b, A.c = B.c, A.d = B.d, A.e = B.e FROM ( SELECT 0 AS a, groupBitmapMerge(bm) AS b , SUM(k) AS c, SUM(value) AS d, SUM(dummy.1) AS e FROM 02416_test) A ANY LEFT JOIN (SELECT 0 AS a, groupBitmapMerge(bm) AS b , SUM(k) AS c, SUM(value) AS d, SUM(dummy.1) AS e FROM 02416_test_memory) B USING a ORDER BY a;
TRUNCATE TABLE 02416_test;
SELECT 0 == COUNT(1) FROM 02416_test;
DROP TABLE IF EXISTS 02416_test SYNC;
DROP TABLE IF EXISTS 02416_test_memory;

View File

@ -0,0 +1,10 @@
1 11
------
1 11
2 22
------
1 11
2 22
------
1 11
2 22

View File

@ -0,0 +1,20 @@
-- Tags: no-ordinary-database, no-fasttest
DROP TABLE IF EXISTS 02417_test SYNC;
CREATE TABLE 02417_test (key UInt64, value UInt64) Engine=KeeperMap('/' || currentDatabase() || '/test2417') PRIMARY KEY(key);
INSERT INTO 02417_test VALUES (1, 11);
SELECT * FROM 02417_test ORDER BY key;
SELECT '------';
CREATE TABLE 02417_test_another (key UInt64, value UInt64) Engine=KeeperMap('/' || currentDatabase() || '/test2417') PRIMARY KEY(key);
INSERT INTO 02417_test_another VALUES (2, 22);
SELECT * FROM 02417_test_another ORDER BY key;
SELECT '------';
SELECT * FROM 02417_test ORDER BY key;
SELECT '------';
DROP TABLE 02417_test SYNC;
SELECT * FROM 02417_test_another ORDER BY key;
DROP TABLE 02417_test_another SYNC;

View File

@ -0,0 +1,2 @@
Ok
Ok

Some files were not shown because too many files have changed in this diff Show More