Merge remote-tracking branch 'origin/master' into pr-local-plan

This commit is contained in:
Igor Nikonov 2024-07-03 07:51:38 +00:00
commit b51e1b27db
16 changed files with 209 additions and 86 deletions

View File

@ -1,24 +1,20 @@
---
slug: /en/sql-reference/data-types/json
slug: /en/sql-reference/data-types/object-data-type
sidebar_position: 26
sidebar_label: JSON
sidebar_label: Object Data Type
keywords: [object, data type]
---
# JSON
# Object Data Type
:::note
This feature is experimental and is not production-ready. If you need to work with JSON documents, consider using [this guide](/docs/en/integrations/data-ingestion/data-formats/json.md) instead.
This feature is not production-ready and is now deprecated. If you need to work with JSON documents, consider using [this guide](/docs/en/integrations/data-ingestion/data-formats/json) instead. A new implementation to support JSON object is in progress and can be tracked [here](https://github.com/ClickHouse/ClickHouse/issues/54864)
:::
Stores JavaScript Object Notation (JSON) documents in a single column.
`JSON` is an alias for `Object('json')`.
:::note
The JSON data type is an obsolete feature. Do not use it.
If you want to use it, set `allow_experimental_object_type = 1`.
:::
## Example
**Example 1**
@ -49,7 +45,7 @@ SELECT o.a, o.b.c, o.b.d[3] FROM json
**Example 2**
To be able to create an ordered `MergeTree` family table the sorting key has to be extracted into its column. For example, to insert a file of compressed HTTP access logs in JSON format:
To be able to create an ordered `MergeTree` family table, the sorting key has to be extracted into its column. For example, to insert a file of compressed HTTP access logs in JSON format:
```sql
CREATE TABLE logs
@ -69,7 +65,7 @@ FROM file('access.json.gz', JSONAsString)
## Displaying JSON columns
When displaying a `JSON` column ClickHouse only shows the field values by default (because internally, it is represented as a tuple). You can display the field names as well by setting `output_format_json_named_tuples_as_objects = 1`:
When displaying a `JSON` column, ClickHouse only shows the field values by default (because internally, it is represented as a tuple). You can also display the field names by setting `output_format_json_named_tuples_as_objects = 1`:
```sql
SET output_format_json_named_tuples_as_objects = 1
@ -83,4 +79,5 @@ SELECT * FROM json FORMAT JSONEachRow
## Related Content
- [Using JSON in ClickHouse](/docs/en/integrations/data-formats/json)
- [Getting Data Into ClickHouse - Part 2 - A JSON detour](https://clickhouse.com/blog/getting-data-into-clickhouse-part-2-json)

View File

@ -611,6 +611,13 @@ The server successfully detected this situation and will download merged part fr
M(KeeperPacketsReceived, "Packets received by keeper server") \
M(KeeperRequestTotal, "Total requests number on keeper server") \
M(KeeperLatency, "Keeper latency") \
M(KeeperTotalElapsedMicroseconds, "Keeper total latency for a single request") \
M(KeeperProcessElapsedMicroseconds, "Keeper commit latency for a single request") \
M(KeeperPreprocessElapsedMicroseconds, "Keeper preprocessing latency for a single reuquest") \
M(KeeperStorageLockWaitMicroseconds, "Time spent waiting for acquiring Keeper storage lock") \
M(KeeperCommitWaitElapsedMicroseconds, "Time spent waiting for certain log to be committed") \
M(KeeperBatchMaxCount, "Number of times the size of batch was limited by the amount") \
M(KeeperBatchMaxTotalSize, "Number of times the size of batch was limited by the total bytes size") \
M(KeeperCommits, "Number of successful commits") \
M(KeeperCommitsFailed, "Number of failed commits") \
M(KeeperSnapshotCreations, "Number of snapshots creations")\

View File

@ -9,7 +9,6 @@
#include <IO/ReadHelpers.h>
#include <fmt/format.h>
#include <Common/logger_useful.h>
#include <array>
namespace Coordination
@ -29,7 +28,7 @@ void ZooKeeperResponse::write(WriteBuffer & out) const
Coordination::write(buf.str(), out);
}
std::string ZooKeeperRequest::toString() const
std::string ZooKeeperRequest::toString(bool short_format) const
{
return fmt::format(
"XID = {}\n"
@ -37,7 +36,7 @@ std::string ZooKeeperRequest::toString() const
"Additional info:\n{}",
xid,
getOpNum(),
toStringImpl());
toStringImpl(short_format));
}
void ZooKeeperRequest::write(WriteBuffer & out) const
@ -60,7 +59,7 @@ void ZooKeeperSyncRequest::readImpl(ReadBuffer & in)
Coordination::read(path, in);
}
std::string ZooKeeperSyncRequest::toStringImpl() const
std::string ZooKeeperSyncRequest::toStringImpl(bool /*short_format*/) const
{
return fmt::format("path = {}", path);
}
@ -91,7 +90,7 @@ void ZooKeeperReconfigRequest::readImpl(ReadBuffer & in)
Coordination::read(version, in);
}
std::string ZooKeeperReconfigRequest::toStringImpl() const
std::string ZooKeeperReconfigRequest::toStringImpl(bool /*short_format*/) const
{
return fmt::format(
"joining = {}\nleaving = {}\nnew_members = {}\nversion = {}",
@ -145,7 +144,7 @@ void ZooKeeperAuthRequest::readImpl(ReadBuffer & in)
Coordination::read(data, in);
}
std::string ZooKeeperAuthRequest::toStringImpl() const
std::string ZooKeeperAuthRequest::toStringImpl(bool /*short_format*/) const
{
return fmt::format(
"type = {}\n"
@ -191,7 +190,7 @@ void ZooKeeperCreateRequest::readImpl(ReadBuffer & in)
is_sequential = true;
}
std::string ZooKeeperCreateRequest::toStringImpl() const
std::string ZooKeeperCreateRequest::toStringImpl(bool /*short_format*/) const
{
return fmt::format(
"path = {}\n"
@ -218,7 +217,7 @@ void ZooKeeperRemoveRequest::writeImpl(WriteBuffer & out) const
Coordination::write(version, out);
}
std::string ZooKeeperRemoveRequest::toStringImpl() const
std::string ZooKeeperRemoveRequest::toStringImpl(bool /*short_format*/) const
{
return fmt::format(
"path = {}\n"
@ -245,7 +244,7 @@ void ZooKeeperExistsRequest::readImpl(ReadBuffer & in)
Coordination::read(has_watch, in);
}
std::string ZooKeeperExistsRequest::toStringImpl() const
std::string ZooKeeperExistsRequest::toStringImpl(bool /*short_format*/) const
{
return fmt::format("path = {}", path);
}
@ -272,7 +271,7 @@ void ZooKeeperGetRequest::readImpl(ReadBuffer & in)
Coordination::read(has_watch, in);
}
std::string ZooKeeperGetRequest::toStringImpl() const
std::string ZooKeeperGetRequest::toStringImpl(bool /*short_format*/) const
{
return fmt::format("path = {}", path);
}
@ -303,7 +302,7 @@ void ZooKeeperSetRequest::readImpl(ReadBuffer & in)
Coordination::read(version, in);
}
std::string ZooKeeperSetRequest::toStringImpl() const
std::string ZooKeeperSetRequest::toStringImpl(bool /*short_format*/) const
{
return fmt::format(
"path = {}\n"
@ -334,7 +333,7 @@ void ZooKeeperListRequest::readImpl(ReadBuffer & in)
Coordination::read(has_watch, in);
}
std::string ZooKeeperListRequest::toStringImpl() const
std::string ZooKeeperListRequest::toStringImpl(bool /*short_format*/) const
{
return fmt::format("path = {}", path);
}
@ -356,7 +355,7 @@ void ZooKeeperFilteredListRequest::readImpl(ReadBuffer & in)
list_request_type = static_cast<ListRequestType>(read_request_type);
}
std::string ZooKeeperFilteredListRequest::toStringImpl() const
std::string ZooKeeperFilteredListRequest::toStringImpl(bool /*short_format*/) const
{
return fmt::format(
"path = {}\n"
@ -401,7 +400,7 @@ void ZooKeeperSetACLRequest::readImpl(ReadBuffer & in)
Coordination::read(version, in);
}
std::string ZooKeeperSetACLRequest::toStringImpl() const
std::string ZooKeeperSetACLRequest::toStringImpl(bool /*short_format*/) const
{
return fmt::format("path = {}\nversion = {}", path, version);
}
@ -426,7 +425,7 @@ void ZooKeeperGetACLRequest::writeImpl(WriteBuffer & out) const
Coordination::write(path, out);
}
std::string ZooKeeperGetACLRequest::toStringImpl() const
std::string ZooKeeperGetACLRequest::toStringImpl(bool /*short_format*/) const
{
return fmt::format("path = {}", path);
}
@ -455,7 +454,7 @@ void ZooKeeperCheckRequest::readImpl(ReadBuffer & in)
Coordination::read(version, in);
}
std::string ZooKeeperCheckRequest::toStringImpl() const
std::string ZooKeeperCheckRequest::toStringImpl(bool /*short_format*/) const
{
return fmt::format("path = {}\nversion = {}", path, version);
}
@ -600,8 +599,11 @@ void ZooKeeperMultiRequest::readImpl(ReadBuffer & in)
}
}
std::string ZooKeeperMultiRequest::toStringImpl() const
std::string ZooKeeperMultiRequest::toStringImpl(bool short_format) const
{
if (short_format)
return fmt::format("Subrequests size = {}", requests.size());
auto out = fmt::memory_buffer();
for (const auto & request : requests)
{

View File

@ -63,12 +63,12 @@ struct ZooKeeperRequest : virtual Request
/// Writes length, xid, op_num, then the rest.
void write(WriteBuffer & out) const;
std::string toString() const;
std::string toString(bool short_format = false) const;
virtual void writeImpl(WriteBuffer &) const = 0;
virtual void readImpl(ReadBuffer &) = 0;
virtual std::string toStringImpl() const { return ""; }
virtual std::string toStringImpl(bool /*short_format*/) const { return ""; }
static std::shared_ptr<ZooKeeperRequest> read(ReadBuffer & in);
@ -98,7 +98,7 @@ struct ZooKeeperSyncRequest final : ZooKeeperRequest
OpNum getOpNum() const override { return OpNum::Sync; }
void writeImpl(WriteBuffer & out) const override;
void readImpl(ReadBuffer & in) override;
std::string toStringImpl() const override;
std::string toStringImpl(bool short_format) const override;
ZooKeeperResponsePtr makeResponse() const override;
bool isReadRequest() const override { return false; }
@ -123,7 +123,7 @@ struct ZooKeeperReconfigRequest final : ZooKeeperRequest
OpNum getOpNum() const override { return OpNum::Reconfig; }
void writeImpl(WriteBuffer & out) const override;
void readImpl(ReadBuffer & in) override;
std::string toStringImpl() const override;
std::string toStringImpl(bool short_format) const override;
ZooKeeperResponsePtr makeResponse() const override;
bool isReadRequest() const override { return false; }
@ -176,7 +176,7 @@ struct ZooKeeperAuthRequest final : ZooKeeperRequest
OpNum getOpNum() const override { return OpNum::Auth; }
void writeImpl(WriteBuffer & out) const override;
void readImpl(ReadBuffer & in) override;
std::string toStringImpl() const override;
std::string toStringImpl(bool short_format) const override;
ZooKeeperResponsePtr makeResponse() const override;
bool isReadRequest() const override { return false; }
@ -229,7 +229,7 @@ struct ZooKeeperCreateRequest final : public CreateRequest, ZooKeeperRequest
OpNum getOpNum() const override { return not_exists ? OpNum::CreateIfNotExists : OpNum::Create; }
void writeImpl(WriteBuffer & out) const override;
void readImpl(ReadBuffer & in) override;
std::string toStringImpl() const override;
std::string toStringImpl(bool short_format) const override;
ZooKeeperResponsePtr makeResponse() const override;
bool isReadRequest() const override { return false; }
@ -266,7 +266,7 @@ struct ZooKeeperRemoveRequest final : RemoveRequest, ZooKeeperRequest
OpNum getOpNum() const override { return OpNum::Remove; }
void writeImpl(WriteBuffer & out) const override;
void readImpl(ReadBuffer & in) override;
std::string toStringImpl() const override;
std::string toStringImpl(bool short_format) const override;
ZooKeeperResponsePtr makeResponse() const override;
bool isReadRequest() const override { return false; }
@ -293,7 +293,7 @@ struct ZooKeeperExistsRequest final : ExistsRequest, ZooKeeperRequest
OpNum getOpNum() const override { return OpNum::Exists; }
void writeImpl(WriteBuffer & out) const override;
void readImpl(ReadBuffer & in) override;
std::string toStringImpl() const override;
std::string toStringImpl(bool short_format) const override;
ZooKeeperResponsePtr makeResponse() const override;
bool isReadRequest() const override { return true; }
@ -320,7 +320,7 @@ struct ZooKeeperGetRequest final : GetRequest, ZooKeeperRequest
OpNum getOpNum() const override { return OpNum::Get; }
void writeImpl(WriteBuffer & out) const override;
void readImpl(ReadBuffer & in) override;
std::string toStringImpl() const override;
std::string toStringImpl(bool short_format) const override;
ZooKeeperResponsePtr makeResponse() const override;
bool isReadRequest() const override { return true; }
@ -347,7 +347,7 @@ struct ZooKeeperSetRequest final : SetRequest, ZooKeeperRequest
OpNum getOpNum() const override { return OpNum::Set; }
void writeImpl(WriteBuffer & out) const override;
void readImpl(ReadBuffer & in) override;
std::string toStringImpl() const override;
std::string toStringImpl(bool short_format) const override;
ZooKeeperResponsePtr makeResponse() const override;
bool isReadRequest() const override { return false; }
@ -375,7 +375,7 @@ struct ZooKeeperListRequest : ListRequest, ZooKeeperRequest
OpNum getOpNum() const override { return OpNum::List; }
void writeImpl(WriteBuffer & out) const override;
void readImpl(ReadBuffer & in) override;
std::string toStringImpl() const override;
std::string toStringImpl(bool short_format) const override;
ZooKeeperResponsePtr makeResponse() const override;
bool isReadRequest() const override { return true; }
@ -395,7 +395,7 @@ struct ZooKeeperFilteredListRequest final : ZooKeeperListRequest
OpNum getOpNum() const override { return OpNum::FilteredList; }
void writeImpl(WriteBuffer & out) const override;
void readImpl(ReadBuffer & in) override;
std::string toStringImpl() const override;
std::string toStringImpl(bool short_format) const override;
size_t bytesSize() const override { return ZooKeeperListRequest::bytesSize() + sizeof(list_request_type); }
};
@ -428,7 +428,7 @@ struct ZooKeeperCheckRequest : CheckRequest, ZooKeeperRequest
OpNum getOpNum() const override { return not_exists ? OpNum::CheckNotExists : OpNum::Check; }
void writeImpl(WriteBuffer & out) const override;
void readImpl(ReadBuffer & in) override;
std::string toStringImpl() const override;
std::string toStringImpl(bool short_format) const override;
ZooKeeperResponsePtr makeResponse() const override;
bool isReadRequest() const override { return true; }
@ -469,7 +469,7 @@ struct ZooKeeperSetACLRequest final : SetACLRequest, ZooKeeperRequest
OpNum getOpNum() const override { return OpNum::SetACL; }
void writeImpl(WriteBuffer & out) const override;
void readImpl(ReadBuffer & in) override;
std::string toStringImpl() const override;
std::string toStringImpl(bool short_format) const override;
ZooKeeperResponsePtr makeResponse() const override;
bool isReadRequest() const override { return false; }
@ -490,7 +490,7 @@ struct ZooKeeperGetACLRequest final : GetACLRequest, ZooKeeperRequest
OpNum getOpNum() const override { return OpNum::GetACL; }
void writeImpl(WriteBuffer & out) const override;
void readImpl(ReadBuffer & in) override;
std::string toStringImpl() const override;
std::string toStringImpl(bool short_format) const override;
ZooKeeperResponsePtr makeResponse() const override;
bool isReadRequest() const override { return true; }
@ -516,7 +516,7 @@ struct ZooKeeperMultiRequest final : MultiRequest, ZooKeeperRequest
void writeImpl(WriteBuffer & out) const override;
void readImpl(ReadBuffer & in) override;
std::string toStringImpl() const override;
std::string toStringImpl(bool short_format) const override;
ZooKeeperResponsePtr makeResponse() const override;
bool isReadRequest() const override;

View File

@ -169,6 +169,23 @@ void KeeperConfigurationAndSettings::dump(WriteBufferFromOwnString & buf) const
writeText("async_replication=", buf);
write_bool(coordination_settings->async_replication);
writeText("latest_logs_cache_size_threshold=", buf);
write_int(coordination_settings->latest_logs_cache_size_threshold);
writeText("commit_logs_cache_size_threshold=", buf);
write_int(coordination_settings->commit_logs_cache_size_threshold);
writeText("disk_move_retries_wait_ms=", buf);
write_int(coordination_settings->disk_move_retries_wait_ms);
writeText("disk_move_retries_during_init=", buf);
write_int(coordination_settings->disk_move_retries_during_init);
writeText("log_slow_total_threshold_ms=", buf);
write_int(coordination_settings->log_slow_total_threshold_ms);
writeText("log_slow_cpu_threshold_ms=", buf);
write_int(coordination_settings->log_slow_cpu_threshold_ms);
writeText("log_slow_connection_operation_threshold_ms=", buf);
write_int(coordination_settings->log_slow_connection_operation_threshold_ms);
}
KeeperConfigurationAndSettingsPtr

View File

@ -58,7 +58,10 @@ struct Settings;
M(UInt64, latest_logs_cache_size_threshold, 1 * 1024 * 1024 * 1024, "Maximum total size of in-memory cache of latest log entries.", 0) \
M(UInt64, commit_logs_cache_size_threshold, 500 * 1024 * 1024, "Maximum total size of in-memory cache of log entries needed next for commit.", 0) \
M(UInt64, disk_move_retries_wait_ms, 1000, "How long to wait between retries after a failure which happened while a file was being moved between disks.", 0) \
M(UInt64, disk_move_retries_during_init, 100, "The amount of retries after a failure which happened while a file was being moved between disks during initialization.", 0)
M(UInt64, disk_move_retries_during_init, 100, "The amount of retries after a failure which happened while a file was being moved between disks during initialization.", 0) \
M(UInt64, log_slow_total_threshold_ms, 5000, "Requests for which the total latency is larger than this settings will be logged", 0) \
M(UInt64, log_slow_cpu_threshold_ms, 100, "Requests for which the CPU (preprocessing and processing) latency is larger than this settings will be logged", 0) \
M(UInt64, log_slow_connection_operation_threshold_ms, 1000, "Log message if a certain operation took too long inside a single connection", 0)
DECLARE_SETTINGS_TRAITS(CoordinationSettingsTraits, LIST_OF_COORDINATION_SETTINGS)

View File

@ -238,6 +238,13 @@
M(KeeperPacketsReceived) \
M(KeeperRequestTotal) \
M(KeeperLatency) \
M(KeeperTotalElapsedMicroseconds) \
M(KeeperProcessElapsedMicroseconds) \
M(KeeperPreprocessElapsedMicroseconds) \
M(KeeperStorageLockWaitMicroseconds) \
M(KeeperCommitWaitElapsedMicroseconds) \
M(KeeperBatchMaxCount) \
M(KeeperBatchMaxTotalSize) \
M(KeeperCommits) \
M(KeeperCommitsFailed) \
M(KeeperSnapshotCreations) \

View File

@ -31,6 +31,13 @@ namespace CurrentMetrics
extern const Metric KeeperOutstandingRequets;
}
namespace ProfileEvents
{
extern const Event KeeperCommitWaitElapsedMicroseconds;
extern const Event KeeperBatchMaxCount;
extern const Event KeeperBatchMaxTotalSize;
}
using namespace std::chrono_literals;
namespace DB
@ -119,6 +126,7 @@ void KeeperDispatcher::requestThread()
auto coordination_settings = configuration_and_settings->coordination_settings;
uint64_t max_wait = coordination_settings->operation_timeout_ms.totalMilliseconds();
uint64_t max_batch_bytes_size = coordination_settings->max_requests_batch_bytes_size;
size_t max_batch_size = coordination_settings->max_requests_batch_size;
/// The code below do a very simple thing: batch all write (quorum) requests into vector until
/// previous write batch is not finished or max_batch size achieved. The main complexity goes from
@ -188,7 +196,6 @@ void KeeperDispatcher::requestThread()
return false;
};
size_t max_batch_size = coordination_settings->max_requests_batch_size;
while (!shutdown_called && current_batch.size() < max_batch_size && !has_reconfig_request
&& current_batch_bytes_size < max_batch_bytes_size && try_get_request())
;
@ -225,6 +232,12 @@ void KeeperDispatcher::requestThread()
/// Process collected write requests batch
if (!current_batch.empty())
{
if (current_batch.size() == max_batch_size)
ProfileEvents::increment(ProfileEvents::KeeperBatchMaxCount, 1);
if (current_batch_bytes_size == max_batch_bytes_size)
ProfileEvents::increment(ProfileEvents::KeeperBatchMaxTotalSize, 1);
LOG_TRACE(log, "Processing requests batch, size: {}, bytes: {}", current_batch.size(), current_batch_bytes_size);
auto result = server->putRequestBatch(current_batch);
@ -243,6 +256,8 @@ void KeeperDispatcher::requestThread()
/// If we will execute read or reconfig next, we have to process result now
if (execute_requests_after_write)
{
Stopwatch watch;
SCOPE_EXIT(ProfileEvents::increment(ProfileEvents::KeeperCommitWaitElapsedMicroseconds, watch.elapsedMicroseconds()));
if (prev_result)
result_buf = forceWaitAndProcessResult(
prev_result, prev_batch, /*clear_requests_on_success=*/!execute_requests_after_write);

View File

@ -1,12 +1,14 @@
#include <atomic>
#include <cerrno>
#include <chrono>
#include <Coordination/KeeperDispatcher.h>
#include <Coordination/KeeperReconfiguration.h>
#include <Coordination/KeeperSnapshotManager.h>
#include <Coordination/KeeperStateMachine.h>
#include <Coordination/KeeperDispatcher.h>
#include <Coordination/KeeperStorage.h>
#include <Coordination/KeeperReconfiguration.h>
#include <Coordination/ReadBufferFromNuraftBuffer.h>
#include <Coordination/WriteBufferFromNuraftBuffer.h>
#include <Disks/DiskLocal.h>
#include <IO/ReadHelpers.h>
#include <base/defines.h>
#include <base/errnoToString.h>
@ -17,7 +19,6 @@
#include <Common/ZooKeeper/ZooKeeperCommon.h>
#include <Common/ZooKeeper/ZooKeeperIO.h>
#include <Common/logger_useful.h>
#include <Disks/DiskLocal.h>
namespace ProfileEvents
@ -31,6 +32,7 @@ namespace ProfileEvents
extern const Event KeeperSnapshotApplysFailed;
extern const Event KeeperReadSnapshot;
extern const Event KeeperSaveSnapshot;
extern const Event KeeperStorageLockWaitMicroseconds;
}
namespace DB
@ -151,6 +153,20 @@ void assertDigest(
}
}
struct TSA_SCOPED_LOCKABLE LockGuardWithStats final
{
std::unique_lock<std::mutex> lock;
explicit LockGuardWithStats(std::mutex & mutex) TSA_ACQUIRE(mutex)
{
Stopwatch watch;
std::unique_lock l(mutex);
ProfileEvents::increment(ProfileEvents::KeeperStorageLockWaitMicroseconds, watch.elapsedMicroseconds());
lock = std::move(l);
}
~LockGuardWithStats() TSA_RELEASE() = default;
};
}
nuraft::ptr<nuraft::buffer> KeeperStateMachine::pre_commit(uint64_t log_idx, nuraft::buffer & data)
@ -272,7 +288,7 @@ bool KeeperStateMachine::preprocess(const KeeperStorage::RequestForSession & req
if (op_num == Coordination::OpNum::SessionID || op_num == Coordination::OpNum::Reconfig)
return true;
std::lock_guard lock(storage_and_responses_lock);
LockGuardWithStats lock(storage_and_responses_lock);
if (storage->isFinalized())
return false;
@ -302,7 +318,7 @@ bool KeeperStateMachine::preprocess(const KeeperStorage::RequestForSession & req
void KeeperStateMachine::reconfigure(const KeeperStorage::RequestForSession& request_for_session)
{
std::lock_guard _(storage_and_responses_lock);
LockGuardWithStats lock(storage_and_responses_lock);
KeeperStorage::ResponseForSession response = processReconfiguration(request_for_session);
if (!responses_queue.push(response))
{
@ -391,7 +407,7 @@ nuraft::ptr<nuraft::buffer> KeeperStateMachine::commit(const uint64_t log_idx, n
if (!keeper_context->localLogsPreprocessed() && !preprocess(*request_for_session))
return nullptr;
auto try_push = [this](const KeeperStorage::ResponseForSession& response)
auto try_push = [&](const KeeperStorage::ResponseForSession& response)
{
if (!responses_queue.push(response))
{
@ -400,6 +416,17 @@ nuraft::ptr<nuraft::buffer> KeeperStateMachine::commit(const uint64_t log_idx, n
"Failed to push response with session id {} to the queue, probably because of shutdown",
response.session_id);
}
using namespace std::chrono;
uint64_t elapsed = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count() - request_for_session->time;
if (elapsed > keeper_context->getCoordinationSettings()->log_slow_total_threshold_ms)
{
LOG_INFO(
log,
"Total time to process a request took too long ({}ms).\nRequest info: {}",
elapsed,
request_for_session->request->toString(/*short_format=*/true));
}
};
try
@ -417,7 +444,7 @@ nuraft::ptr<nuraft::buffer> KeeperStateMachine::commit(const uint64_t log_idx, n
response_for_session.session_id = -1;
response_for_session.response = response;
std::lock_guard lock(storage_and_responses_lock);
LockGuardWithStats lock(storage_and_responses_lock);
session_id = storage->getSessionID(session_id_request.session_timeout_ms);
LOG_DEBUG(log, "Session ID response {} with timeout {}", session_id, session_id_request.session_timeout_ms);
response->session_id = session_id;
@ -426,12 +453,13 @@ nuraft::ptr<nuraft::buffer> KeeperStateMachine::commit(const uint64_t log_idx, n
else
{
if (op_num == Coordination::OpNum::Close)
{
std::lock_guard lock(request_cache_mutex);
parsed_request_cache.erase(request_for_session->session_id);
}
std::lock_guard lock(storage_and_responses_lock);
LockGuardWithStats lock(storage_and_responses_lock);
KeeperStorage::ResponsesForSessions responses_for_sessions
= storage->processRequest(request_for_session->request, request_for_session->session_id, request_for_session->zxid);
for (auto & response_for_session : responses_for_sessions)
@ -482,7 +510,7 @@ bool KeeperStateMachine::apply_snapshot(nuraft::snapshot & s)
}
{ /// deserialize and apply snapshot to storage
std::lock_guard lock(storage_and_responses_lock);
LockGuardWithStats lock(storage_and_responses_lock);
SnapshotDeserializationResult snapshot_deserialization_result;
if (latest_snapshot_ptr)
@ -534,7 +562,7 @@ void KeeperStateMachine::rollbackRequest(const KeeperStorage::RequestForSession
if (request_for_session.request->getOpNum() == Coordination::OpNum::SessionID)
return;
std::lock_guard lock(storage_and_responses_lock);
LockGuardWithStats lock(storage_and_responses_lock);
storage->rollbackRequest(request_for_session.zxid, allow_missing);
}
@ -561,7 +589,7 @@ void KeeperStateMachine::create_snapshot(nuraft::snapshot & s, nuraft::async_res
auto snapshot_meta_copy = nuraft::snapshot::deserialize(*snp_buf);
CreateSnapshotTask snapshot_task;
{ /// lock storage for a short period time to turn on "snapshot mode". After that we can read consistent storage state without locking.
std::lock_guard lock(storage_and_responses_lock);
LockGuardWithStats lock(storage_and_responses_lock);
snapshot_task.snapshot = std::make_shared<KeeperStorageSnapshot>(storage.get(), snapshot_meta_copy, getClusterConfig());
}
@ -623,7 +651,7 @@ void KeeperStateMachine::create_snapshot(nuraft::snapshot & s, nuraft::async_res
}
{
/// Destroy snapshot with lock
std::lock_guard lock(storage_and_responses_lock);
LockGuardWithStats lock(storage_and_responses_lock);
LOG_TRACE(log, "Clearing garbage after snapshot");
/// Turn off "snapshot mode" and clear outdate part of storage state
storage->clearGarbageAfterSnapshot();
@ -764,7 +792,7 @@ int KeeperStateMachine::read_logical_snp_obj(
void KeeperStateMachine::processReadRequest(const KeeperStorage::RequestForSession & request_for_session)
{
/// Pure local request, just process it with storage
std::lock_guard lock(storage_and_responses_lock);
LockGuardWithStats lock(storage_and_responses_lock);
auto responses = storage->processRequest(
request_for_session.request, request_for_session.session_id, std::nullopt, true /*check_acl*/, true /*is_local*/);
for (const auto & response : responses)
@ -774,97 +802,97 @@ void KeeperStateMachine::processReadRequest(const KeeperStorage::RequestForSessi
void KeeperStateMachine::shutdownStorage()
{
std::lock_guard lock(storage_and_responses_lock);
LockGuardWithStats lock(storage_and_responses_lock);
storage->finalize();
}
std::vector<int64_t> KeeperStateMachine::getDeadSessions()
{
std::lock_guard lock(storage_and_responses_lock);
LockGuardWithStats lock(storage_and_responses_lock);
return storage->getDeadSessions();
}
int64_t KeeperStateMachine::getNextZxid() const
{
std::lock_guard lock(storage_and_responses_lock);
LockGuardWithStats lock(storage_and_responses_lock);
return storage->getNextZXID();
}
KeeperStorage::Digest KeeperStateMachine::getNodesDigest() const
{
std::lock_guard lock(storage_and_responses_lock);
LockGuardWithStats lock(storage_and_responses_lock);
return storage->getNodesDigest(false);
}
uint64_t KeeperStateMachine::getLastProcessedZxid() const
{
std::lock_guard lock(storage_and_responses_lock);
LockGuardWithStats lock(storage_and_responses_lock);
return storage->getZXID();
}
uint64_t KeeperStateMachine::getNodesCount() const
{
std::lock_guard lock(storage_and_responses_lock);
LockGuardWithStats lock(storage_and_responses_lock);
return storage->getNodesCount();
}
uint64_t KeeperStateMachine::getTotalWatchesCount() const
{
std::lock_guard lock(storage_and_responses_lock);
LockGuardWithStats lock(storage_and_responses_lock);
return storage->getTotalWatchesCount();
}
uint64_t KeeperStateMachine::getWatchedPathsCount() const
{
std::lock_guard lock(storage_and_responses_lock);
LockGuardWithStats lock(storage_and_responses_lock);
return storage->getWatchedPathsCount();
}
uint64_t KeeperStateMachine::getSessionsWithWatchesCount() const
{
std::lock_guard lock(storage_and_responses_lock);
LockGuardWithStats lock(storage_and_responses_lock);
return storage->getSessionsWithWatchesCount();
}
uint64_t KeeperStateMachine::getTotalEphemeralNodesCount() const
{
std::lock_guard lock(storage_and_responses_lock);
LockGuardWithStats lock(storage_and_responses_lock);
return storage->getTotalEphemeralNodesCount();
}
uint64_t KeeperStateMachine::getSessionWithEphemeralNodesCount() const
{
std::lock_guard lock(storage_and_responses_lock);
LockGuardWithStats lock(storage_and_responses_lock);
return storage->getSessionWithEphemeralNodesCount();
}
void KeeperStateMachine::dumpWatches(WriteBufferFromOwnString & buf) const
{
std::lock_guard lock(storage_and_responses_lock);
LockGuardWithStats lock(storage_and_responses_lock);
storage->dumpWatches(buf);
}
void KeeperStateMachine::dumpWatchesByPath(WriteBufferFromOwnString & buf) const
{
std::lock_guard lock(storage_and_responses_lock);
LockGuardWithStats lock(storage_and_responses_lock);
storage->dumpWatchesByPath(buf);
}
void KeeperStateMachine::dumpSessionsAndEphemerals(WriteBufferFromOwnString & buf) const
{
std::lock_guard lock(storage_and_responses_lock);
LockGuardWithStats lock(storage_and_responses_lock);
storage->dumpSessionsAndEphemerals(buf);
}
uint64_t KeeperStateMachine::getApproximateDataSize() const
{
std::lock_guard lock(storage_and_responses_lock);
LockGuardWithStats lock(storage_and_responses_lock);
return storage->getApproximateDataSize();
}
uint64_t KeeperStateMachine::getKeyArenaSize() const
{
std::lock_guard lock(storage_and_responses_lock);
LockGuardWithStats lock(storage_and_responses_lock);
return storage->getArenaDataSize();
}
@ -905,7 +933,7 @@ ClusterConfigPtr KeeperStateMachine::getClusterConfig() const
void KeeperStateMachine::recalculateStorageStats()
{
std::lock_guard lock(storage_and_responses_lock);
LockGuardWithStats lock(storage_and_responses_lock);
LOG_INFO(log, "Recalculating storage stats");
storage->recalculateStats();
LOG_INFO(log, "Done recalculating storage stats");

View File

@ -182,8 +182,7 @@ private:
KeeperSnapshotManagerS3 * snapshot_manager_s3;
KeeperStorage::ResponseForSession processReconfiguration(
const KeeperStorage::RequestForSession& request_for_session)
KeeperStorage::ResponseForSession processReconfiguration(const KeeperStorage::RequestForSession & request_for_session)
TSA_REQUIRES(storage_and_responses_lock);
};
}

View File

@ -40,6 +40,8 @@ namespace ProfileEvents
extern const Event KeeperGetRequest;
extern const Event KeeperListRequest;
extern const Event KeeperExistsRequest;
extern const Event KeeperPreprocessElapsedMicroseconds;
extern const Event KeeperProcessElapsedMicroseconds;
}
namespace DB
@ -2309,6 +2311,20 @@ void KeeperStorage::preprocessRequest(
std::optional<Digest> digest,
int64_t log_idx)
{
Stopwatch watch;
SCOPE_EXIT({
auto elapsed = watch.elapsedMicroseconds();
if (auto elapsed_ms = elapsed / 1000; elapsed_ms > keeper_context->getCoordinationSettings()->log_slow_cpu_threshold_ms)
{
LOG_INFO(
getLogger("KeeperStorage"),
"Preprocessing a request took too long ({}ms).\nRequest info: {}",
elapsed_ms,
zk_request->toString(/*short_format=*/true));
}
ProfileEvents::increment(ProfileEvents::KeeperPreprocessElapsedMicroseconds, elapsed);
});
if (!initialized)
throw Exception(ErrorCodes::LOGICAL_ERROR, "KeeperStorage system nodes are not initialized");
@ -2409,6 +2425,20 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest(
bool check_acl,
bool is_local)
{
Stopwatch watch;
SCOPE_EXIT({
auto elapsed = watch.elapsedMicroseconds();
if (auto elapsed_ms = elapsed / 1000; elapsed_ms > keeper_context->getCoordinationSettings()->log_slow_cpu_threshold_ms)
{
LOG_INFO(
getLogger("KeeperStorage"),
"Processing a request took too long ({}ms).\nRequest info: {}",
elapsed_ms,
zk_request->toString(/*short_format=*/true));
}
ProfileEvents::increment(ProfileEvents::KeeperProcessElapsedMicroseconds, elapsed);
});
if (!initialized)
throw Exception(ErrorCodes::LOGICAL_ERROR, "KeeperStorage system nodes are not initialized");

View File

@ -3,6 +3,7 @@
#include <Common/HashTable/HashMap.h>
#include <Common/ArenaUtils.h>
#include <list>
namespace DB
{

View File

@ -59,6 +59,7 @@ static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory
{
{"24.7", {{"output_format_parquet_write_page_index", false, true, "Add a possibility to write page index into parquet files."},
{"parallel_replicas_local_plan", false, true, "Use local plan for local replica in a query with parallel replicas"},
{"input_format_json_ignore_key_case", false, false, "Ignore json key case while read json field from string."},
{"optimize_trivial_insert_select", true, false, "The optimization does not make sense in many cases."},
}},
{"24.6", {{"materialize_skip_indexes_on_insert", true, true, "Added new setting to allow to disable materialization of skip indexes on insert"},
@ -90,7 +91,6 @@ static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory
{"output_format_csv_serialize_tuple_into_separate_columns", true, true, "A new way of how interpret tuples in CSV format was added."},
{"input_format_csv_deserialize_separate_columns_into_tuple", true, true, "A new way of how interpret tuples in CSV format was added."},
{"input_format_csv_try_infer_strings_from_quoted_tuples", true, true, "A new way of how interpret tuples in CSV format was added."},
{"input_format_json_ignore_key_case", false, false, "Ignore json key case while read json field from string."},
}},
{"24.5", {{"allow_deprecated_error_prone_window_functions", true, false, "Allow usage of deprecated error prone window functions (neighbor, runningAccumulate, runningDifferenceStartingWithFirstValue, runningDifference)"},
{"allow_experimental_join_condition", false, false, "Support join with inequal conditions which involve columns from both left and right table. e.g. t1.y < t2.y."},

View File

@ -13,11 +13,9 @@
#include <Common/setThreadName.h>
#include <Common/logger_useful.h>
#include <base/defines.h>
#include <chrono>
#include <Common/PipeFDs.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <IO/ReadBufferFromFileDescriptor.h>
#include <queue>
#include <mutex>
#include <Coordination/FourLetterCommand.h>
#include <IO/CompressionMethod.h>
@ -30,6 +28,11 @@
#include <poll.h>
#endif
namespace ProfileEvents
{
extern const Event KeeperTotalElapsedMicroseconds;
}
namespace DB
{
@ -411,12 +414,12 @@ void KeeperTCPHandler::runImpl()
keeper_dispatcher->registerSession(session_id, response_callback);
Stopwatch logging_stopwatch;
auto operation_max_ms = keeper_dispatcher->getKeeperContext()->getCoordinationSettings()->log_slow_connection_operation_threshold_ms;
auto log_long_operation = [&](const String & operation)
{
constexpr UInt64 operation_max_ms = 500;
auto elapsed_ms = logging_stopwatch.elapsedMilliseconds();
if (operation_max_ms < elapsed_ms)
LOG_TEST(log, "{} for session {} took {} ms", operation, session_id, elapsed_ms);
LOG_INFO(log, "{} for session {} took {} ms", operation, session_id, elapsed_ms);
logging_stopwatch.restart();
};
@ -611,11 +614,13 @@ void KeeperTCPHandler::updateStats(Coordination::ZooKeeperResponsePtr & response
/// update statistics ignoring watch response and heartbeat.
if (response->xid != Coordination::WATCH_XID && response->getOpNum() != Coordination::OpNum::Heartbeat)
{
Int64 elapsed = (Poco::Timestamp() - operations[response->xid]) / 1000;
conn_stats.updateLatency(elapsed);
Int64 elapsed = (Poco::Timestamp() - operations[response->xid]);
ProfileEvents::increment(ProfileEvents::KeeperTotalElapsedMicroseconds, elapsed);
Int64 elapsed_ms = elapsed / 1000;
conn_stats.updateLatency(elapsed_ms);
operations.erase(response->xid);
keeper_dispatcher->updateKeeperStatLatency(elapsed);
keeper_dispatcher->updateKeeperStatLatency(elapsed_ms);
last_op.set(std::make_unique<LastOp>(LastOp{
.name = Coordination::toString(response->getOpNum()),

View File

@ -111,10 +111,12 @@ void ObjectStorageQueueSource::FileIterator::returnForRetry(Source::ObjectInfoPt
if (metadata->useBucketsForProcessing())
{
const auto bucket = metadata->getBucketForPath(object_info->relative_path);
std::lock_guard lock(mutex);
listed_keys_cache[bucket].keys.emplace_front(object_info);
}
else
{
std::lock_guard lock(mutex);
objects_to_retry.push_back(object_info);
}
}

View File

@ -293,6 +293,16 @@ def test_cmd_conf(started_cluster):
assert result["configuration_change_tries_count"] == "20"
assert result["async_replication"] == "true"
assert result["latest_logs_cache_size_threshold"] == "1073741824"
assert result["commit_logs_cache_size_threshold"] == "524288000"
assert result["disk_move_retries_wait_ms"] == "1000"
assert result["disk_move_retries_during_init"] == "100"
assert result["log_slow_total_threshold_ms"] == "5000"
assert result["log_slow_cpu_threshold_ms"] == "100"
assert result["log_slow_connection_operation_threshold_ms"] == "1000"
finally:
close_keeper_socket(client)