mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-21 01:00:48 +00:00
Merge branch 'master' into debug_keeper
This commit is contained in:
commit
8246aacb90
@ -565,6 +565,8 @@ void LocalServer::processConfig()
|
||||
global_context->setCurrentDatabase(default_database);
|
||||
applyCmdOptions(global_context);
|
||||
|
||||
bool enable_objects_loader = false;
|
||||
|
||||
if (config().has("path"))
|
||||
{
|
||||
String path = global_context->getPath();
|
||||
@ -576,6 +578,7 @@ void LocalServer::processConfig()
|
||||
LOG_DEBUG(log, "Loading user defined objects from {}", path);
|
||||
Poco::File(path + "user_defined/").createDirectories();
|
||||
UserDefinedSQLObjectsLoader::instance().loadObjects(global_context);
|
||||
enable_objects_loader = true;
|
||||
LOG_DEBUG(log, "Loaded user defined objects.");
|
||||
|
||||
LOG_DEBUG(log, "Loading metadata from {}", path);
|
||||
@ -599,6 +602,9 @@ void LocalServer::processConfig()
|
||||
attachInformationSchema(global_context, *createMemoryDatabaseIfNotExists(global_context, DatabaseCatalog::INFORMATION_SCHEMA_UPPERCASE));
|
||||
}
|
||||
|
||||
/// Persist SQL user defined objects only if user_defined folder was created
|
||||
UserDefinedSQLObjectsLoader::instance().enable(enable_objects_loader);
|
||||
|
||||
server_display_name = config().getString("display_name", getFQDNOrHostName());
|
||||
prompt_by_server_display_name = config().getRawString("prompt_by_server_display_name.default", "{display_name} :) ");
|
||||
std::map<String, String> prompt_substitutions{{"display_name", server_display_name}};
|
||||
|
@ -23,7 +23,7 @@ static constexpr auto NS = 1000000000UL;
|
||||
|
||||
/// Tracking window. Actually the size is not really important. We just want to avoid
|
||||
/// throttles when there are no actions for a long period time.
|
||||
static const double window_ns = 7UL * NS;
|
||||
static const double window_ns = 1UL * NS;
|
||||
|
||||
void Throttler::add(size_t amount)
|
||||
{
|
||||
|
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
@ -36,7 +36,7 @@ BlockIO InterpreterCreateFunctionQuery::execute()
|
||||
|
||||
UserDefinedSQLFunctionFactory::instance().registerFunction(function_name, query_ptr);
|
||||
|
||||
if (!is_internal)
|
||||
if (!persist_function)
|
||||
{
|
||||
try
|
||||
{
|
||||
|
@ -11,10 +11,10 @@ class Context;
|
||||
class InterpreterCreateFunctionQuery : public IInterpreter, WithContext
|
||||
{
|
||||
public:
|
||||
InterpreterCreateFunctionQuery(const ASTPtr & query_ptr_, ContextPtr context_, bool is_internal_)
|
||||
InterpreterCreateFunctionQuery(const ASTPtr & query_ptr_, ContextPtr context_, bool persist_function_)
|
||||
: WithContext(context_)
|
||||
, query_ptr(query_ptr_)
|
||||
, is_internal(is_internal_) {}
|
||||
, persist_function(persist_function_) {}
|
||||
|
||||
BlockIO execute() override;
|
||||
|
||||
@ -26,7 +26,7 @@ private:
|
||||
static void validateFunctionRecursiveness(ASTPtr node, const String & function_to_create);
|
||||
|
||||
ASTPtr query_ptr;
|
||||
bool is_internal;
|
||||
bool persist_function;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -83,6 +83,9 @@ void UserDefinedSQLObjectsLoader::loadUserDefinedObject(ContextPtr context, User
|
||||
|
||||
void UserDefinedSQLObjectsLoader::loadObjects(ContextPtr context)
|
||||
{
|
||||
if (unlikely(!enable_persistence))
|
||||
return;
|
||||
|
||||
LOG_DEBUG(log, "loading user defined objects");
|
||||
|
||||
String dir_path = context->getPath() + "user_defined/";
|
||||
@ -110,6 +113,9 @@ void UserDefinedSQLObjectsLoader::loadObjects(ContextPtr context)
|
||||
|
||||
void UserDefinedSQLObjectsLoader::storeObject(ContextPtr context, UserDefinedSQLObjectType object_type, const String & object_name, const IAST & ast)
|
||||
{
|
||||
if (unlikely(!enable_persistence))
|
||||
return;
|
||||
|
||||
String dir_path = context->getPath() + "user_defined/";
|
||||
String file_path;
|
||||
|
||||
@ -143,6 +149,9 @@ void UserDefinedSQLObjectsLoader::storeObject(ContextPtr context, UserDefinedSQL
|
||||
|
||||
void UserDefinedSQLObjectsLoader::removeObject(ContextPtr context, UserDefinedSQLObjectType object_type, const String & object_name)
|
||||
{
|
||||
if (unlikely(!enable_persistence))
|
||||
return;
|
||||
|
||||
String dir_path = context->getPath() + "user_defined/";
|
||||
LOG_DEBUG(log, "Removing file for user defined object {} from {}", backQuote(object_name), dir_path);
|
||||
|
||||
@ -162,4 +171,9 @@ void UserDefinedSQLObjectsLoader::removeObject(ContextPtr context, UserDefinedSQ
|
||||
std::filesystem::remove(file_path);
|
||||
}
|
||||
|
||||
void UserDefinedSQLObjectsLoader::enable(bool enable_persistence_)
|
||||
{
|
||||
enable_persistence = enable_persistence_;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -24,10 +24,14 @@ public:
|
||||
void storeObject(ContextPtr context, UserDefinedSQLObjectType object_type, const String & object_name, const IAST & ast);
|
||||
void removeObject(ContextPtr context, UserDefinedSQLObjectType object_type, const String & object_name);
|
||||
|
||||
/// For ClickHouse local if path is not set we can disable loader.
|
||||
void enable(bool enable_persistence);
|
||||
|
||||
private:
|
||||
|
||||
void loadUserDefinedObject(ContextPtr context, UserDefinedSQLObjectType object_type, const std::string_view & object_name, const String & file_path);
|
||||
Poco::Logger * log;
|
||||
bool enable_persistence = true;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -24,6 +24,7 @@
|
||||
#include <Parsers/ASTLiteral.h>
|
||||
#include "StorageLogSettings.h"
|
||||
#include <Processors/Sources/SourceWithProgress.h>
|
||||
#include <Processors/Sources/NullSource.h>
|
||||
#include <Processors/Pipe.h>
|
||||
#include <Processors/Sinks/SinkToStorage.h>
|
||||
|
||||
@ -122,9 +123,6 @@ Chunk LogSource::generate()
|
||||
if (rows_read == rows_limit)
|
||||
return {};
|
||||
|
||||
if (storage.file_checker.empty())
|
||||
return {};
|
||||
|
||||
/// How many rows to read for the next block.
|
||||
size_t max_rows_to_read = std::min(block_size, rows_limit - rows_read);
|
||||
std::unordered_map<String, ISerialization::SubstreamsCache> caches;
|
||||
@ -185,7 +183,10 @@ void LogSource::readData(const NameAndTypePair & name_and_type, ColumnPtr & colu
|
||||
|
||||
UInt64 offset = 0;
|
||||
if (!stream_for_prefix && mark_number)
|
||||
{
|
||||
std::lock_guard marks_lock(file_it->second.marks_mutex);
|
||||
offset = file_it->second.marks[mark_number].offset;
|
||||
}
|
||||
|
||||
auto & data_file_path = file_it->second.data_file_path;
|
||||
auto it = streams.try_emplace(stream_name, storage.disk, data_file_path, offset, read_settings).first;
|
||||
@ -459,7 +460,10 @@ void LogSink::writeMarks(MarksForColumns && marks)
|
||||
writeIntBinary(mark.second.offset, *marks_stream);
|
||||
|
||||
size_t column_index = mark.first;
|
||||
storage.files[storage.column_names_by_idx[column_index]].marks.push_back(mark.second);
|
||||
|
||||
auto & file = storage.files[storage.column_names_by_idx[column_index]];
|
||||
std::lock_guard marks_lock(file.marks_mutex);
|
||||
file.marks.push_back(mark.second);
|
||||
}
|
||||
}
|
||||
|
||||
@ -672,6 +676,9 @@ Pipe StorageLog::read(
|
||||
if (!lock)
|
||||
throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED);
|
||||
|
||||
if (file_checker.empty())
|
||||
return Pipe(std::make_shared<NullSource>(metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID())));
|
||||
|
||||
Pipes pipes;
|
||||
|
||||
const Marks & marks = getMarksWithRealRowCount(metadata_snapshot);
|
||||
|
@ -82,6 +82,8 @@ private:
|
||||
size_t column_index;
|
||||
|
||||
String data_file_path;
|
||||
|
||||
std::mutex marks_mutex;
|
||||
Marks marks;
|
||||
};
|
||||
using Files = std::map<String, ColumnData>; /// file name -> column data
|
||||
|
@ -98,9 +98,6 @@ public:
|
||||
protected:
|
||||
Chunk generate() override
|
||||
{
|
||||
if (storage.file_checker.empty())
|
||||
return {};
|
||||
|
||||
Block res;
|
||||
start();
|
||||
|
||||
@ -337,7 +334,7 @@ Pipe StorageStripeLog::read(
|
||||
Pipes pipes;
|
||||
|
||||
String index_file = table_path + "index.mrk";
|
||||
if (!disk->exists(index_file))
|
||||
if (file_checker.empty() || !disk->exists(index_file))
|
||||
{
|
||||
return Pipe(std::make_shared<NullSource>(metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID())));
|
||||
}
|
||||
|
@ -2,6 +2,17 @@
|
||||
# pylint: disable=redefined-outer-name
|
||||
# pylint: disable=line-too-long
|
||||
|
||||
# This test verifies that memory tracking does not have significant drift,
|
||||
# in other words, every allocation should be taken into account at the global
|
||||
# memory tracker.
|
||||
#
|
||||
# So we are running some queries with GROUP BY to make some allocations,
|
||||
# and after we are checking MemoryTracking metric from system.metrics,
|
||||
# and check that it does not changes too frequently.
|
||||
#
|
||||
# Also note, that syncing MemoryTracking with RSS had been disabled in
|
||||
# asynchronous_metrics_update_period_s.xml.
|
||||
|
||||
import logging
|
||||
import pytest
|
||||
from helpers.cluster import ClickHouseCluster
|
||||
@ -46,6 +57,8 @@ def get_MemoryTracking():
|
||||
return int(http_query("SELECT value FROM system.metrics WHERE metric = 'MemoryTracking'"))
|
||||
|
||||
def check_memory(memory):
|
||||
# bytes -> megabytes
|
||||
memory = [*map(lambda x: int(int(x)/1024/1024), memory)]
|
||||
# 3 changes to MemoryTracking is minimum, since:
|
||||
# - this is not that high to not detect inacuracy
|
||||
# - memory can go like X/X+N due to some background allocations
|
||||
|
Loading…
Reference in New Issue
Block a user