Merge branch 'master' into filecache-small-optimization

This commit is contained in:
Kseniia Sumarokova 2024-01-17 10:41:32 +01:00 committed by GitHub
commit 5be8644c8b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 184 additions and 144 deletions

View File

@ -25,6 +25,9 @@ public:
/// Is the user allowed to write a new entry into the cache?
virtual bool approveWrite(const UUID & user_id, size_t entry_size_in_bytes) const = 0;
/// Clears the policy contents
virtual void clear() = 0;
virtual ~ICachePolicyUserQuota() = default;
};
@ -38,6 +41,7 @@ public:
void increaseActual(const UUID & /*user_id*/, size_t /*entry_size_in_bytes*/) override {}
void decreaseActual(const UUID & /*user_id*/, size_t /*entry_size_in_bytes*/) override {}
bool approveWrite(const UUID & /*user_id*/, size_t /*entry_size_in_bytes*/) const override { return true; }
void clear() override {}
};

View File

@ -32,6 +32,10 @@ std::atomic<bool> show_addresses = true;
bool shouldShowAddress(const void * addr)
{
/// Likely inline frame
if (!addr)
return false;
/// If the address is less than 4096, most likely it is a nullptr dereference with offset,
/// and showing this offset is secure nevertheless.
/// NOTE: 4096 is the page size on x86 and it can be different on other systems,
@ -203,20 +207,24 @@ static void * getCallerAddress(const ucontext_t & context)
#endif
}
// FIXME: looks like this is used only for Sentry but duplicates the whole algo, maybe replace?
void StackTrace::symbolize(
const StackTrace::FramePointers & frame_pointers, [[maybe_unused]] size_t offset, size_t size, StackTrace::Frames & frames)
void StackTrace::forEachFrame(
const StackTrace::FramePointers & frame_pointers,
size_t offset,
size_t size,
std::function<void(const Frame &)> callback,
bool fatal)
{
#if defined(__ELF__) && !defined(OS_FREEBSD)
const DB::SymbolIndex & symbol_index = DB::SymbolIndex::instance();
std::unordered_map<std::string, DB::Dwarf> dwarfs;
for (size_t i = 0; i < offset; ++i)
frames[i].virtual_addr = frame_pointers[i];
using enum DB::Dwarf::LocationInfoMode;
const auto mode = fatal ? FULL_WITH_INLINE : FAST;
for (size_t i = offset; i < size; ++i)
{
StackTrace::Frame & current_frame = frames[i];
StackTrace::Frame current_frame;
std::vector<DB::Dwarf::SymbolizedFrame> inline_frames;
current_frame.virtual_addr = frame_pointers[i];
const auto * object = symbol_index.findObject(current_frame.virtual_addr);
uintptr_t virtual_offset = object ? uintptr_t(object->address_begin) : 0;
@ -230,26 +238,41 @@ void StackTrace::symbolize(
auto dwarf_it = dwarfs.try_emplace(object->name, object->elf).first;
DB::Dwarf::LocationInfo location;
std::vector<DB::Dwarf::SymbolizedFrame> inline_frames;
if (dwarf_it->second.findAddress(
uintptr_t(current_frame.physical_addr), location, DB::Dwarf::LocationInfoMode::FAST, inline_frames))
uintptr_t(current_frame.physical_addr), location, mode, inline_frames))
{
current_frame.file = location.file.toString();
current_frame.line = location.line;
}
}
}
else
current_frame.object = "?";
if (const auto * symbol = symbol_index.findSymbol(current_frame.virtual_addr))
current_frame.symbol = demangle(symbol->name);
else
current_frame.symbol = "?";
for (const auto & frame : inline_frames)
{
StackTrace::Frame current_inline_frame;
const String file_for_inline_frame = frame.location.file.toString();
current_inline_frame.file = "inlined from " + file_for_inline_frame;
current_inline_frame.line = frame.location.line;
current_inline_frame.symbol = frame.name;
callback(current_inline_frame);
}
callback(current_frame);
}
#else
for (size_t i = 0; i < size; ++i)
frames[i].virtual_addr = frame_pointers[i];
UNUSED(fatal);
for (size_t i = offset; i < size; ++i)
{
StackTrace::Frame current_frame;
current_frame.virtual_addr = frame_pointers[i];
callback(current_frame);
}
#endif
}
@ -349,72 +372,52 @@ toStringEveryLineImpl([[maybe_unused]] bool fatal, const StackTraceRefTriple & s
if (stack_trace.size == 0)
return callback("<Empty trace>");
size_t frame_index = stack_trace.offset;
#if defined(__ELF__) && !defined(OS_FREEBSD)
using enum DB::Dwarf::LocationInfoMode;
const auto mode = fatal ? FULL_WITH_INLINE : FAST;
const DB::SymbolIndex & symbol_index = DB::SymbolIndex::instance();
std::unordered_map<String, DB::Dwarf> dwarfs;
for (size_t i = stack_trace.offset; i < stack_trace.size; ++i)
size_t inline_frame_index = 0;
auto callback_wrapper = [&](const StackTrace::Frame & frame)
{
std::vector<DB::Dwarf::SymbolizedFrame> inline_frames;
const void * virtual_addr = stack_trace.pointers[i];
const auto * object = symbol_index.findObject(virtual_addr);
uintptr_t virtual_offset = object ? uintptr_t(object->address_begin) : 0;
const void * physical_addr = reinterpret_cast<const void *>(uintptr_t(virtual_addr) - virtual_offset);
DB::WriteBufferFromOwnString out;
out << i << ". ";
String file;
if (std::error_code ec; object && std::filesystem::exists(object->name, ec) && !ec)
/// Inline frame
if (!frame.virtual_addr)
{
auto dwarf_it = dwarfs.try_emplace(object->name, object->elf).first;
DB::Dwarf::LocationInfo location;
if (dwarf_it->second.findAddress(uintptr_t(physical_addr), location, mode, inline_frames))
{
file = location.file.toString();
out << file << ":" << location.line << ": ";
}
out << frame_index << "." << inline_frame_index++ << ". ";
}
else
{
out << frame_index++ << ". ";
inline_frame_index = 0;
}
if (const auto * const symbol = symbol_index.findSymbol(virtual_addr))
out << demangleAndCollapseNames(file, symbol->name);
if (frame.file.has_value() && frame.line.has_value())
out << *frame.file << ':' << *frame.line << ": ";
if (frame.symbol.has_value() && frame.file.has_value())
out << demangleAndCollapseNames(*frame.file, frame.symbol->data());
else
out << "?";
if (shouldShowAddress(physical_addr))
if (shouldShowAddress(frame.physical_addr))
{
out << " @ ";
DB::writePointerHex(physical_addr, out);
DB::writePointerHex(frame.physical_addr, out);
}
out << " in " << (object ? object->name : "?");
for (size_t j = 0; j < inline_frames.size(); ++j)
{
const auto & frame = inline_frames[j];
const String file_for_inline_frame = frame.location.file.toString();
callback(fmt::format(
"{}.{}. inlined from {}:{}: {}",
i,
j + 1,
file_for_inline_frame,
frame.location.line,
demangleAndCollapseNames(file_for_inline_frame, frame.name)));
}
if (frame.object.has_value())
out << " in " << *frame.object;
callback(out.str());
}
};
#else
for (size_t i = stack_trace.offset; i < stack_trace.size; ++i)
if (const void * const addr = stack_trace.pointers[i]; shouldShowAddress(addr))
callback(fmt::format("{}. {}", i, addr));
auto callback_wrapper = [&](const StackTrace::Frame & frame)
{
if (frame.virtual_addr && shouldShowAddress(frame.virtual_addr))
callback(fmt::format("{}. {}", frame_index++, frame.virtual_addr));
};
#endif
StackTrace::forEachFrame(stack_trace.pointers, stack_trace.offset, stack_trace.size, callback_wrapper, fatal);
}
void StackTrace::toStringEveryLine(std::function<void(std::string_view)> callback) const

View File

@ -62,7 +62,14 @@ public:
static std::string toString(void ** frame_pointers, size_t offset, size_t size);
static void dropCache();
static void symbolize(const FramePointers & frame_pointers, size_t offset, size_t size, StackTrace::Frames & frames);
/// @param fatal - if true, will process inline frames (slower)
static void forEachFrame(
const FramePointers & frame_pointers,
size_t offset,
size_t size,
std::function<void(const Frame &)> callback,
bool fatal);
void toStringEveryLine(std::function<void(std::string_view)> callback) const;
static void toStringEveryLine(const FramePointers & frame_pointers, std::function<void(std::string_view)> callback);

View File

@ -38,12 +38,12 @@ public:
bool approveWrite(const UUID & user_id, size_t entry_size_in_bytes) const override
{
auto it_actual = actual.find(user_id);
Resources actual_for_user{.size_in_bytes = 0, .num_items = 0}; /// assume zero actual resource consumption is user isn't found
Resources actual_for_user{.size_in_bytes = 0, .num_items = 0}; /// if no user is found, the default is no resource consumption
if (it_actual != actual.end())
actual_for_user = it_actual->second;
auto it_quota = quotas.find(user_id);
Resources quota_for_user{.size_in_bytes = std::numeric_limits<size_t>::max(), .num_items = std::numeric_limits<size_t>::max()}; /// assume no threshold if no quota is found
Resources quota_for_user{.size_in_bytes = std::numeric_limits<size_t>::max(), .num_items = std::numeric_limits<size_t>::max()}; /// if no user is found, the default is no threshold
if (it_quota != quotas.end())
quota_for_user = it_quota->second;
@ -54,16 +54,21 @@ public:
quota_for_user.num_items = std::numeric_limits<UInt64>::max();
/// Check size quota
if (actual_for_user.size_in_bytes + entry_size_in_bytes >= quota_for_user.size_in_bytes)
if (actual_for_user.size_in_bytes + entry_size_in_bytes > quota_for_user.size_in_bytes)
return false;
/// Check items quota
if (quota_for_user.num_items + 1 >= quota_for_user.num_items)
if (actual_for_user.num_items + 1 > quota_for_user.num_items)
return false;
return true;
}
void clear() override
{
actual.clear();
}
struct Resources
{
size_t size_in_bytes = 0;
@ -125,6 +130,7 @@ public:
void clear() override
{
cache.clear();
Base::user_quotas->clear();
}
void remove(const Key & key) override

View File

@ -810,7 +810,7 @@ class IColumn;
M(Bool, parallelize_output_from_storages, true, "Parallelize output for reading step from storage. It allows parallelizing query processing right after reading from storage if possible", 0) \
M(String, insert_deduplication_token, "", "If not empty, used for duplicate detection instead of data digest", 0) \
M(Bool, count_distinct_optimization, false, "Rewrite count distinct to subquery of group by", 0) \
M(Bool, throw_if_no_data_to_insert, true, "Enables or disables empty INSERTs, enabled by default", 0) \
M(Bool, throw_if_no_data_to_insert, true, "Allows or forbids empty INSERTs, enabled by default (throws an error on an empty insert)", 0) \
M(Bool, compatibility_ignore_auto_increment_in_create_table, false, "Ignore AUTO_INCREMENT keyword in column declaration if true, otherwise return error. It simplifies migration from MySQL", 0) \
M(Bool, multiple_joins_try_to_keep_original_names, false, "Do not add aliases to top level expression list on multiple joins rewrite", 0) \
M(Bool, optimize_sorting_by_input_stream_properties, true, "Optimize sorting by sorting properties of input stream", 0) \

View File

@ -169,11 +169,9 @@ void SentryWriter::onFault(int sig, const std::string & error_message, const Sta
};
StackTrace::Frames frames;
StackTrace::symbolize(stack_trace.getFramePointers(), offset, stack_size, frames);
for (ssize_t i = stack_size - 1; i >= offset; --i)
auto sentry_add_stack_trace = [&](const StackTrace::Frame & current_frame)
{
const StackTrace::Frame & current_frame = frames[i];
sentry_value_t sentry_frame = sentry_value_new_object();
UInt64 frame_ptr = reinterpret_cast<UInt64>(current_frame.virtual_addr);
@ -190,7 +188,9 @@ void SentryWriter::onFault(int sig, const std::string & error_message, const Sta
sentry_value_set_by_key(sentry_frame, "lineno", sentry_value_new_int32(static_cast<int32_t>(current_frame.line.value())));
sentry_value_append(sentry_frames, sentry_frame);
}
};
StackTrace::forEachFrame(stack_trace.getFramePointers(), offset, stack_size, sentry_add_stack_trace, /* fatal= */ true);
}
/// Prepare data for https://develop.sentry.dev/sdk/event-payloads/threads/

View File

@ -64,7 +64,7 @@ static const size_t SSL_REQUEST_PAYLOAD_SIZE = 32;
static String selectEmptyReplacementQuery(const String & query);
static String showTableStatusReplacementQuery(const String & query);
static String killConnectionIdReplacementQuery(const String & query);
static std::optional<String> setSettingReplacementQuery(const String & query, const String & mysql_setting, const String & native_setting);
static String selectLimitReplacementQuery(const String & query);
MySQLHandler::MySQLHandler(
IServer & server_,
@ -86,12 +86,10 @@ MySQLHandler::MySQLHandler(
if (ssl_enabled)
server_capabilities |= CLIENT_SSL;
queries_replacements.emplace("KILL QUERY", killConnectionIdReplacementQuery);
queries_replacements.emplace("SHOW TABLE STATUS LIKE", showTableStatusReplacementQuery);
queries_replacements.emplace("SHOW VARIABLES", selectEmptyReplacementQuery);
settings_replacements.emplace("SQL_SELECT_LIMIT", "limit");
settings_replacements.emplace("NET_WRITE_TIMEOUT", "send_timeout");
settings_replacements.emplace("NET_READ_TIMEOUT", "receive_timeout");
replacements.emplace("KILL QUERY", killConnectionIdReplacementQuery);
replacements.emplace("SHOW TABLE STATUS LIKE", showTableStatusReplacementQuery);
replacements.emplace("SHOW VARIABLES", selectEmptyReplacementQuery);
replacements.emplace("SET SQL_SELECT_LIMIT", selectLimitReplacementQuery);
}
void MySQLHandler::run()
@ -340,30 +338,16 @@ void MySQLHandler::comQuery(ReadBuffer & payload, bool binary_protocol)
bool should_replace = false;
bool with_output = false;
// Queries replacements
for (auto const & [query_to_replace, replacement_fn] : queries_replacements)
for (auto const & x : replacements)
{
if (0 == strncasecmp(query_to_replace.c_str(), query.c_str(), query_to_replace.size()))
if (0 == strncasecmp(x.first.c_str(), query.c_str(), x.first.size()))
{
should_replace = true;
replacement_query = replacement_fn(query);
replacement_query = x.second(query);
break;
}
}
// Settings replacements
if (!should_replace)
for (auto const & [mysql_setting, native_setting] : settings_replacements)
{
const auto replacement_query_opt = setSettingReplacementQuery(query, mysql_setting, native_setting);
if (replacement_query_opt.has_value())
{
should_replace = true;
replacement_query = replacement_query_opt.value();
break;
}
}
ReadBufferFromString replacement(replacement_query);
auto query_context = session->makeQueryContext();
@ -601,12 +585,12 @@ static String showTableStatusReplacementQuery(const String & query)
return query;
}
static std::optional<String> setSettingReplacementQuery(const String & query, const String & mysql_setting, const String & native_setting)
static String selectLimitReplacementQuery(const String & query)
{
const String prefix = "SET " + mysql_setting;
if (0 == strncasecmp(prefix.c_str(), query.c_str(), prefix.size()))
return "SET " + native_setting + String(query.data() + prefix.length());
return std::nullopt;
const String prefix = "SET SQL_SELECT_LIMIT";
if (query.starts_with(prefix))
return "SET limit" + std::string(query.data() + prefix.length());
return query;
}
/// Replace "KILL QUERY [connection_id]" into "KILL QUERY WHERE query_id LIKE 'mysql:[connection_id]:xxx'".

View File

@ -92,12 +92,9 @@ protected:
MySQLProtocol::PacketEndpointPtr packet_endpoint;
std::unique_ptr<Session> session;
using QueryReplacementFn = std::function<String(const String & query)>;
using QueriesReplacements = std::unordered_map<std::string, QueryReplacementFn>;
QueriesReplacements queries_replacements;
using SettingsReplacements = std::unordered_map<std::string, std::string>;
SettingsReplacements settings_replacements;
using ReplacementFn = std::function<String(const String & query)>;
using Replacements = std::unordered_map<std::string, ReplacementFn>;
Replacements replacements;
std::mutex prepared_statements_mutex;
UInt32 current_prepared_statement_id TSA_GUARDED_BY(prepared_statements_mutex) = 0;

View File

@ -2690,7 +2690,7 @@ ReplicatedMergeTreeQueue::addSubscriber(ReplicatedMergeTreeQueue::SubscriberCall
}
}
LOG_TEST(log, "Waiting for {} entries to be processed: {}", out_entry_names.size(), fmt::join(out_entry_names, ", "));
LOG_TRACE(log, "Waiting for {} entries to be processed: {}", out_entry_names.size(), fmt::join(out_entry_names, ", "));
}
auto it = subscribers.emplace(subscribers.end(), std::move(callback));

View File

@ -1079,7 +1079,7 @@ def test_startup_without_zk(started_cluster):
err = main_node.query_and_get_error(
"CREATE DATABASE startup ENGINE = Replicated('/clickhouse/databases/startup', 'shard1', 'replica1');"
)
assert "ZooKeeper" in err
assert "ZooKeeper" in err or "Coordination::Exception" in err
main_node.query(
"CREATE DATABASE startup ENGINE = Replicated('/clickhouse/databases/startup', 'shard1', 'replica1');"
)

View File

@ -0,0 +1,13 @@
a
b
1
c
d
3
--
a
b
1
c
d
3

View File

@ -0,0 +1,41 @@
-- Tags: no-parallel
-- Tag no-parallel: Messes with internal cache
-- Tests per-user quotas of the query cache. Settings 'query_cache_max_size_in_bytes' and 'query_cache_max_entries' are actually supposed to
-- be used in a settings profile, together with a readonly constraint. For simplicity, test both settings stand-alone in a stateless test
-- instead of an integration test - the relevant logic will still be covered by that.
SYSTEM DROP QUERY CACHE;
-- Run SELECT with quota that current user may write only 1 entry in the query cache
SET query_cache_max_entries = 1;
SELECT 'a' SETTINGS use_query_cache = true;
SELECT 'b' SETTINGS use_query_cache = true;
SELECT count(*) FROM system.query_cache; -- expect 1 entry
-- Run SELECTs again but w/o quota
SET query_cache_max_entries = DEFAULT;
SELECT 'c' SETTINGS use_query_cache = true;
SELECT 'd' SETTINGS use_query_cache = true;
SELECT count(*) FROM system.query_cache; -- expect 3 entries
SYSTEM DROP QUERY CACHE;
-- Run the same as above after a DROP QUERY CACHE.
SELECT '--';
SET query_cache_max_entries = 1;
SELECT 'a' SETTINGS use_query_cache = true;
SELECT 'b' SETTINGS use_query_cache = true;
SELECT count(*) FROM system.query_cache; -- expect 1 entry
-- Run SELECTs again but w/o quota
SET query_cache_max_entries = DEFAULT;
SELECT 'c' SETTINGS use_query_cache = true;
SELECT 'd' SETTINGS use_query_cache = true;
SELECT count(*) FROM system.query_cache; -- expect 3 entries
SYSTEM DROP QUERY CACHE;
-- SELECT '---';

View File

@ -0,0 +1,20 @@
SELECT NULL AND (toDate(-2147483647, NULL) AND NULL)
FROM remote('127.0.0.{1,2}', view(
SELECT
NULL AND NULL,
NULL,
toDate(toDate('0.0001048577', toDate(NULL, 10 AND (toDate(257, 9223372036854775807, NULL) AND NULL AND NULL) AND NULL, 7, NULL), NULL, NULL) AND NULL AND -2147483648, NULL, NULL) AND NULL
FROM system.one
WHERE toDate(toDate(NULL, NULL, NULL), NULL)
GROUP BY
GROUPING SETS ((NULL))
));
SELECT NULL AND (toDate(-2147483647, NULL) AND NULL)
FROM remote('127.0.0.{1,2}', view(
SELECT NULL
FROM system.one
WHERE toDate(toDate(NULL, NULL, NULL), NULL)
GROUP BY
GROUPING SETS ((''))
));

View File

@ -1,14 +0,0 @@
-- Uppercase tests
name value
limit 11
name value
send_timeout 22
name value
receive_timeout 33
-- Lowercase tests
name value
limit 44
name value
send_timeout 55
name value
receive_timeout 66

View File

@ -1,21 +0,0 @@
#!/usr/bin/env bash
# Tags: no-fasttest
# Tag no-fasttest: requires mysql client
# Tests the override of certain MySQL proprietary settings to ClickHouse native settings
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
CHANGED_SETTINGS_QUERY="SELECT name, value FROM system.settings WHERE name IN ('limit', 'send_timeout', 'receive_timeout') AND changed;"
echo "-- Uppercase tests"
${MYSQL_CLIENT} --execute "SET SQL_SELECT_LIMIT = 11; $CHANGED_SETTINGS_QUERY"
${MYSQL_CLIENT} --execute "SET NET_WRITE_TIMEOUT = 22; $CHANGED_SETTINGS_QUERY"
${MYSQL_CLIENT} --execute "SET NET_READ_TIMEOUT = 33; $CHANGED_SETTINGS_QUERY"
echo "-- Lowercase tests"
${MYSQL_CLIENT} --execute "set sql_select_limit=44; $CHANGED_SETTINGS_QUERY"
${MYSQL_CLIENT} --execute "set net_write_timeout=55; $CHANGED_SETTINGS_QUERY"
${MYSQL_CLIENT} --execute "set net_read_timeout=66; $CHANGED_SETTINGS_QUERY"