Merge branch 'master' into execute-query-flush-format-2

This commit is contained in:
Alexey Milovidov 2020-04-20 09:19:34 +03:00
commit 70054e90d6
31 changed files with 539 additions and 77 deletions

View File

@ -81,6 +81,7 @@ namespace CurrentMetrics
{
extern const Metric Revision;
extern const Metric VersionInteger;
extern const Metric MemoryTracking;
}
namespace
@ -555,6 +556,28 @@ int Server::main(const std::vector<std::string> & /*args*/)
global_context->setFormatSchemaPath(format_schema_path.path());
format_schema_path.createDirectories();
/// Limit on total memory usage
size_t max_server_memory_usage = settings.max_server_memory_usage;
double max_server_memory_usage_to_ram_ratio = config().getDouble("max_server_memory_usage_to_ram_ratio", 0.9);
size_t default_max_server_memory_usage = memory_amount * max_server_memory_usage_to_ram_ratio;
if (max_server_memory_usage == 0)
{
max_server_memory_usage = default_max_server_memory_usage;
LOG_INFO(log, "Setting max_server_memory_usage was set to " << formatReadableSizeWithBinarySuffix(max_server_memory_usage));
}
else if (max_server_memory_usage > default_max_server_memory_usage)
{
max_server_memory_usage = default_max_server_memory_usage;
LOG_INFO(log, "Setting max_server_memory_usage was lowered to " << formatReadableSizeWithBinarySuffix(max_server_memory_usage)
<< " because the system has low amount of memory");
}
total_memory_tracker.setOrRaiseHardLimit(max_server_memory_usage);
total_memory_tracker.setDescription("(total)");
total_memory_tracker.setMetric(CurrentMetrics::MemoryTracking);
LOG_INFO(log, "Loading metadata from " + path);
try

View File

@ -30,7 +30,7 @@
M(QueryThread, "Number of query processing threads") \
M(ReadonlyReplica, "Number of Replicated tables that are currently in readonly state due to re-initialization after ZooKeeper session loss or due to startup without ZooKeeper configured.") \
M(LeaderReplica, "Number of Replicated tables that are leaders. Leader replica is responsible for assigning merges, cleaning old blocks for deduplications and a few more bookkeeping tasks. There may be no more than one leader across all replicas at one moment of time. If there is no leader it will be elected soon or it indicate an issue.") \
M(MemoryTracking, "Total amount of memory (bytes) allocated in currently executing queries. Note that some memory allocations may not be accounted.") \
M(MemoryTracking, "Total amount of memory (bytes) allocated by the server.") \
M(MemoryTrackingInBackgroundProcessingPool, "Total amount of memory (bytes) allocated in background processing pool (that is dedicated for backround merges, mutations and fetches). Note that this value may include a drift when the memory was allocated in a context of background processing pool and freed in other context or vice-versa. This happens naturally due to caches for tables indexes and doesn't indicate memory leaks.") \
M(MemoryTrackingInBackgroundMoveProcessingPool, "Total amount of memory (bytes) allocated in background processing pool (that is dedicated for backround moves). Note that this value may include a drift when the memory was allocated in a context of background processing pool and freed in other context or vice-versa. This happens naturally due to caches for tables indexes and doesn't indicate memory leaks.") \
M(MemoryTrackingInBackgroundSchedulePool, "Total amount of memory (bytes) allocated in background schedule pool (that is dedicated for bookkeeping tasks of Replicated tables).") \

View File

@ -0,0 +1,90 @@
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <cassert>
#include "MemoryStatisticsOS.h"
#include <Common/Exception.h>
#include <IO/ReadBufferFromMemory.h>
#include <IO/ReadHelpers.h>
namespace DB
{
namespace ErrorCodes
{
extern const int FILE_DOESNT_EXIST;
extern const int CANNOT_OPEN_FILE;
extern const int CANNOT_READ_FROM_FILE_DESCRIPTOR;
}
static constexpr auto filename = "/proc/self/statm";
static constexpr size_t PAGE_SIZE = 4096;
MemoryStatisticsOS::MemoryStatisticsOS()
{
fd = ::open(filename, O_RDONLY | O_CLOEXEC);
if (-1 == fd)
throwFromErrno("Cannot open file " + std::string(filename), errno == ENOENT ? ErrorCodes::FILE_DOESNT_EXIST : ErrorCodes::CANNOT_OPEN_FILE);
}
MemoryStatisticsOS::~MemoryStatisticsOS()
{
if (0 != ::close(fd))
tryLogCurrentException(__PRETTY_FUNCTION__);
}
MemoryStatisticsOS::Data MemoryStatisticsOS::get() const
{
Data data;
constexpr size_t buf_size = 1024;
char buf[buf_size];
ssize_t res = 0;
do
{
res = ::pread(fd, buf, buf_size, 0);
if (-1 == res)
{
if (errno == EINTR)
continue;
throwFromErrno("Cannot read from file " + std::string(filename), ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR);
}
assert(res >= 0);
break;
} while (true);
ReadBufferFromMemory in(buf, res);
uint64_t unused;
readIntText(data.virt, in);
skipWhitespaceIfAny(in);
readIntText(data.resident, in);
skipWhitespaceIfAny(in);
readIntText(data.shared, in);
skipWhitespaceIfAny(in);
readIntText(data.code, in);
skipWhitespaceIfAny(in);
readIntText(unused, in);
skipWhitespaceIfAny(in);
readIntText(data.data_and_stack, in);
data.virt *= PAGE_SIZE;
data.resident *= PAGE_SIZE;
data.shared *= PAGE_SIZE;
data.code *= PAGE_SIZE;
data.data_and_stack *= PAGE_SIZE;
return data;
}
}

View File

@ -0,0 +1,40 @@
#pragma once
#include <cstdint>
namespace DB
{
/** Opens a file /proc/self/mstat. Keeps it open and reads memory statistics via 'pread'.
* This is Linux specific.
* See: man procfs
*
* Note: a class is used instead of a single function to avoid excessive file open/close on every use.
* pread is used to avoid lseek.
*
* Actual performance is from 1 to 5 million iterations per second.
*/
class MemoryStatisticsOS
{
public:
/// In number of bytes.
struct Data
{
uint64_t virt;
uint64_t resident;
uint64_t shared;
uint64_t code;
uint64_t data_and_stack;
};
MemoryStatisticsOS();
~MemoryStatisticsOS();
/// Thread-safe.
Data get() const;
private:
int fd;
};
}

View File

@ -25,10 +25,16 @@ static constexpr size_t log_peak_memory_usage_every = 1ULL << 30;
/// Each thread could new/delete memory in range of (-untracked_memory_limit, untracked_memory_limit) without access to common counters.
static constexpr Int64 untracked_memory_limit = 4 * 1024 * 1024;
MemoryTracker total_memory_tracker(nullptr, VariableContext::Global);
MemoryTracker::MemoryTracker(VariableContext level_) : parent(&total_memory_tracker), level(level_) {}
MemoryTracker::MemoryTracker(MemoryTracker * parent_, VariableContext level_) : parent(parent_), level(level_) {}
MemoryTracker::~MemoryTracker()
{
if (static_cast<int>(level) < static_cast<int>(VariableContext::Process) && peak)
if ((level == VariableContext::Process || level == VariableContext::User) && peak)
{
try
{
@ -39,19 +45,6 @@ MemoryTracker::~MemoryTracker()
/// Exception in Logger, intentionally swallow.
}
}
/** This is needed for next memory tracker to be consistent with sum of all referring memory trackers.
*
* Sometimes, memory tracker could be destroyed before memory was freed, and on destruction, amount > 0.
* For example, a query could allocate some data and leave it in cache.
*
* If memory will be freed outside of context of this memory tracker,
* but in context of one of the 'next' memory trackers,
* then memory usage of 'next' memory trackers will be underestimated,
* because amount will be decreased twice (first - here, second - when real 'free' happens).
*/
if (auto value = amount.load(std::memory_order_relaxed))
free(value);
}
@ -62,10 +55,11 @@ void MemoryTracker::logPeakMemoryUsage() const
<< ": " << formatReadableSizeWithBinarySuffix(peak) << ".");
}
static void logMemoryUsage(Int64 amount)
void MemoryTracker::logMemoryUsage(Int64 current) const
{
LOG_DEBUG(&Logger::get("MemoryTracker"),
"Current memory usage: " << formatReadableSizeWithBinarySuffix(amount) << ".");
"Current memory usage" << (description ? " " + std::string(description) : "")
<< ": " << formatReadableSizeWithBinarySuffix(current) << ".");
}
@ -131,17 +125,24 @@ void MemoryTracker::alloc(Int64 size)
throw DB::Exception(message.str(), DB::ErrorCodes::MEMORY_LIMIT_EXCEEDED);
}
updatePeak(will_be);
if (auto loaded_next = parent.load(std::memory_order_relaxed))
loaded_next->alloc(size);
}
void MemoryTracker::updatePeak(Int64 will_be)
{
auto peak_old = peak.load(std::memory_order_relaxed);
if (will_be > peak_old) /// Races doesn't matter. Could rewrite with CAS, but not worth.
{
peak.store(will_be, std::memory_order_relaxed);
if (level == VariableContext::Process && will_be / log_peak_memory_usage_every > peak_old / log_peak_memory_usage_every)
if ((level == VariableContext::Process || level == VariableContext::Global)
&& will_be / log_peak_memory_usage_every > peak_old / log_peak_memory_usage_every)
logMemoryUsage(will_be);
}
if (auto loaded_next = parent.load(std::memory_order_relaxed))
loaded_next->alloc(size);
}
@ -198,6 +199,13 @@ void MemoryTracker::reset()
}
void MemoryTracker::set(Int64 to)
{
amount.store(to, std::memory_order_relaxed);
updatePeak(to);
}
void MemoryTracker::setOrRaiseHardLimit(Int64 value)
{
/// This is just atomic set to maximum.

View File

@ -13,6 +13,7 @@
*/
class MemoryTracker
{
private:
std::atomic<Int64> amount {0};
std::atomic<Int64> peak {0};
std::atomic<Int64> hard_limit {0};
@ -33,9 +34,12 @@ class MemoryTracker
/// This description will be used as prefix into log messages (if isn't nullptr)
const char * description = nullptr;
void updatePeak(Int64 will_be);
void logMemoryUsage(Int64 current) const;
public:
MemoryTracker(VariableContext level_ = VariableContext::Thread) : level(level_) {}
MemoryTracker(MemoryTracker * parent_, VariableContext level_ = VariableContext::Thread) : parent(parent_), level(level_) {}
MemoryTracker(VariableContext level_ = VariableContext::Thread);
MemoryTracker(MemoryTracker * parent_, VariableContext level_ = VariableContext::Thread);
~MemoryTracker();
@ -113,6 +117,9 @@ public:
/// Reset the accumulated data and the parent.
void reset();
/// Reset current counter to a new value.
void set(Int64 to);
/// Prints info about peak memory consumption into log.
void logPeakMemoryUsage() const;
@ -120,6 +127,8 @@ public:
DB::SimpleActionBlocker blocker;
};
extern MemoryTracker total_memory_tracker;
/// Convenience methods, that use current thread's memory_tracker if it is available.
namespace CurrentMemoryTracker

View File

@ -68,3 +68,6 @@ target_link_libraries (symbol_index PRIVATE clickhouse_common_io)
add_executable (chaos_sanitizer chaos_sanitizer.cpp)
target_link_libraries (chaos_sanitizer PRIVATE clickhouse_common_io)
add_executable (memory_statistics_os_perf memory_statistics_os_perf.cpp)
target_link_libraries (memory_statistics_os_perf PRIVATE clickhouse_common_io)

View File

@ -0,0 +1,23 @@
#include <Common/MemoryStatisticsOS.h>
#include <iostream>
int main(int argc, char ** argv)
{
using namespace DB;
size_t num_iterations = argc >= 2 ? std::stoull(argv[1]) : 1000000;
MemoryStatisticsOS stats;
uint64_t counter = 0;
for (size_t i = 0; i < num_iterations; ++i)
{
MemoryStatisticsOS::Data data = stats.get();
counter += data.resident;
}
std::cerr << (counter / num_iterations) << '\n';
return 0;
}

View File

@ -339,7 +339,7 @@ struct Settings : public SettingsCollection<Settings>
\
M(SettingUInt64, max_memory_usage, 0, "Maximum memory usage for processing of single query. Zero means unlimited.", 0) \
M(SettingUInt64, max_memory_usage_for_user, 0, "Maximum memory usage for processing all concurrently running queries for the user. Zero means unlimited.", 0) \
M(SettingUInt64, max_memory_usage_for_all_queries, 0, "Maximum memory usage for processing all concurrently running queries on the server. Zero means unlimited.", 0) \
M(SettingUInt64, max_server_memory_usage, 0, "Maximum memory usage for server. Only has meaning at server startup. It can be specified only for default profile.", 0) \
M(SettingUInt64, memory_profiler_step, 0, "Every number of bytes the memory profiler will collect the allocating stack trace. The minimal effective step is 4 MiB (less values will work as clamped to 4 MiB). Zero means disabled memory profiler.", 0) \
\
M(SettingUInt64, max_network_bandwidth, 0, "The maximum speed of data exchange over the network in bytes per second for a query. Zero means unlimited.", 0) \
@ -423,6 +423,8 @@ struct Settings : public SettingsCollection<Settings>
M(SettingBool, merge_tree_uniform_read_distribution, true, "Obsolete setting, does nothing. Will be removed after 2020-05-20", 0) \
M(SettingUInt64, mark_cache_min_lifetime, 0, "Obsolete setting, does nothing. Will be removed after 2020-05-31", 0) \
M(SettingBool, partial_merge_join, false, "Obsolete. Use join_algorithm='prefer_partial_merge' instead.", 0) \
M(SettingUInt64, max_memory_usage_for_all_queries, 0, "Obsolete. Will be removed after 2020-10-20", 0) \
DECLARE_SETTINGS_COLLECTION(LIST_OF_SETTINGS)

View File

@ -12,6 +12,7 @@
#include <Databases/IDatabase.h>
#include <chrono>
#if !defined(ARCADIA_BUILD)
# include "config_core.h"
#endif
@ -130,6 +131,24 @@ void AsynchronousMetrics::update()
set("Uptime", context.getUptimeSeconds());
/// Process memory usage according to OS
#if defined(OS_LINUX)
{
MemoryStatisticsOS::Data data = memory_stat.get();
set("MemoryVirtual", data.virt);
set("MemoryResident", data.resident);
set("MemoryShared", data.shared);
set("MemoryCode", data.code);
set("MemoryDataAndStack", data.data_and_stack);
/// We must update the value of total_memory_tracker periodically.
/// Otherwise it might be calculated incorrectly - it can include a "drift" of memory amount.
/// See https://github.com/ClickHouse/ClickHouse/issues/10293
total_memory_tracker.set(data.resident);
}
#endif
{
auto databases = DatabaseCatalog::instance().getDatabases();

View File

@ -6,6 +6,7 @@
#include <unordered_map>
#include <string>
#include <Common/ThreadPool.h>
#include <Common/MemoryStatisticsOS.h>
namespace DB
@ -44,6 +45,7 @@ private:
Container container;
mutable std::mutex container_mutex;
MemoryStatisticsOS memory_stat;
ThreadFromGlobalPool thread;
void run();

View File

@ -23,13 +23,10 @@ Context removeUserRestrictionsFromSettings(const Context & context, const Settin
/// Does not matter on remote servers, because queries are sent under different user.
new_settings.max_concurrent_queries_for_user = 0;
new_settings.max_memory_usage_for_user = 0;
/// This setting is really not for user and should not be sent to remote server.
new_settings.max_memory_usage_for_all_queries = 0;
/// Set as unchanged to avoid sending to remote server.
new_settings.max_concurrent_queries_for_user.changed = false;
new_settings.max_memory_usage_for_user.changed = false;
new_settings.max_memory_usage_for_all_queries.changed = false;
if (settings.force_optimize_skip_unused_shards_no_nested)
{

View File

@ -14,12 +14,6 @@
#include <chrono>
namespace CurrentMetrics
{
extern const Metric MemoryTracking;
}
namespace DB
{
@ -68,7 +62,6 @@ static bool isUnlimitedQuery(const IAST * ast)
ProcessList::ProcessList(size_t max_size_)
: max_size(max_size_)
{
total_memory_tracker.setMetric(CurrentMetrics::MemoryTracking);
}
@ -171,19 +164,7 @@ ProcessList::EntryPtr ProcessList::insert(const String & query_, const IAST * as
process_it->setUserProcessList(&user_process_list);
/// Limits are only raised (to be more relaxed) or set to something instead of zero,
/// because settings for different queries will interfere each other:
/// setting from one query effectively sets values for all other queries.
/// Track memory usage for all simultaneously running queries.
/// You should specify this value in configuration for default profile,
/// not for specific users, sessions or queries,
/// because this setting is effectively global.
total_memory_tracker.setOrRaiseHardLimit(settings.max_memory_usage_for_all_queries);
total_memory_tracker.setDescription("(total)");
/// Track memory usage for all simultaneously running queries from single user.
user_process_list.user_memory_tracker.setParent(&total_memory_tracker);
user_process_list.user_memory_tracker.setOrRaiseHardLimit(settings.max_memory_usage_for_user);
user_process_list.user_memory_tracker.setDescription("(for user)");
@ -280,14 +261,9 @@ ProcessListEntry::~ProcessListEntry()
if (user_process_list.queries.empty())
user_process_list.resetTrackers();
/// This removes memory_tracker for all requests. At this time, no other memory_trackers live.
/// Reset throttler, similarly (see above).
if (parent.processes.empty())
{
/// Reset MemoryTracker, similarly (see above).
parent.total_memory_tracker.logPeakMemoryUsage();
parent.total_memory_tracker.reset();
parent.total_network_throttler.reset();
}
}

View File

@ -295,9 +295,6 @@ protected:
/// Stores info about queries grouped by their priority
QueryPriorities priorities;
/// Limit and counter for memory of all simultaneously running queries.
MemoryTracker total_memory_tracker{VariableContext::Global};
/// Limit network bandwidth for all users
ThrottlerPtr total_network_throttler;

View File

@ -96,6 +96,14 @@ SystemLogs::SystemLogs(Context & global_context, const Poco::Util::AbstractConfi
logs.emplace_back(text_log.get());
if (metric_log)
logs.emplace_back(metric_log.get());
bool lazy_load = config.getBool("system_tables_lazy_load", true);
for (auto & log : logs)
{
if (!lazy_load)
log->prepareTable();
log->startup();
}
}

View File

@ -73,6 +73,8 @@ public:
virtual String getName() = 0;
virtual ASTPtr getCreateTableQuery() = 0;
virtual void flush() = 0;
virtual void prepareTable() = 0;
virtual void startup() = 0;
virtual void shutdown() = 0;
virtual ~ISystemLog() = default;
};
@ -129,6 +131,9 @@ public:
/// Flush data in the buffer to disk
void flush() override;
/// Start the background thread.
void startup() override;
/// Stop the background flush thread before destructor. No more data will be written.
void shutdown() override
{
@ -177,7 +182,7 @@ private:
* Renames old table if its structure is not suitable.
* This cannot be done in constructor to avoid deadlock while renaming a table under locked Context when SystemLog object is created.
*/
void prepareTable();
void prepareTable() override;
/// flushImpl can be executed only in saving_thread.
void flushImpl(const std::vector<LogElement> & to_flush, uint64_t to_flush_end);
@ -197,7 +202,12 @@ SystemLog<LogElement>::SystemLog(Context & context_,
{
assert(database_name_ == DatabaseCatalog::SYSTEM_DATABASE);
log = &Logger::get("SystemLog (" + database_name_ + "." + table_name_ + ")");
}
template <typename LogElement>
void SystemLog<LogElement>::startup()
{
saving_thread = ThreadFromGlobalPool([this] { savingThreadFunction(); });
}

View File

@ -94,15 +94,11 @@ namespace ErrorCodes
extern const int CORRUPTED_DATA;
extern const int BAD_TYPE_OF_FIELD;
extern const int BAD_ARGUMENTS;
extern const int MEMORY_LIMIT_EXCEEDED;
extern const int INVALID_PARTITION_VALUE;
extern const int METADATA_MISMATCH;
extern const int PART_IS_TEMPORARILY_LOCKED;
extern const int TOO_MANY_PARTS;
extern const int INCOMPATIBLE_COLUMNS;
extern const int CANNOT_ALLOCATE_MEMORY;
extern const int CANNOT_MUNMAP;
extern const int CANNOT_MREMAP;
extern const int BAD_TTL_EXPRESSION;
extern const int INCORRECT_FILE_NAME;
extern const int BAD_DATA_PART_NAME;
@ -954,10 +950,7 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
/// Don't count the part as broken if there is not enough memory to load it.
/// In fact, there can be many similar situations.
/// But it is OK, because there is a safety guard against deleting too many parts.
if (e.code() == ErrorCodes::MEMORY_LIMIT_EXCEEDED
|| e.code() == ErrorCodes::CANNOT_ALLOCATE_MEMORY
|| e.code() == ErrorCodes::CANNOT_MUNMAP
|| e.code() == ErrorCodes::CANNOT_MREMAP)
if (isNotEnoughMemoryErrorCode(e.code()))
throw;
broken = true;

View File

@ -27,6 +27,8 @@ MergeTreeSequentialBlockInputStream::MergeTreeSequentialBlockInputStream(
message << "Reading " << data_part->getMarksCount() << " marks from part " << data_part->name
<< ", total " << data_part->rows_count
<< " rows starting from the beginning of the part";
if (columns_to_read.size() == 1) /// Print column name but don't pollute logs in case of many columns.
message << ", column " << columns_to_read.front();
LOG_TRACE(log, message.rdbuf());
}

View File

@ -248,9 +248,13 @@ CheckResult ReplicatedMergeTreePartCheckThread::checkPart(const String & part_na
LOG_INFO(log, "Part " << part_name << " looks good.");
}
catch (const Exception &)
catch (const Exception & e)
{
/// TODO Better to check error code.
/// Don't count the part as broken if there is not enough memory to load it.
/// In fact, there can be many similar situations.
/// But it is OK, because there is a safety guard against deleting too many parts.
if (isNotEnoughMemoryErrorCode(e.code()))
throw;
tryLogCurrentException(log, __PRETTY_FUNCTION__);

View File

@ -24,6 +24,22 @@ namespace ErrorCodes
{
extern const int CORRUPTED_DATA;
extern const int UNKNOWN_PART_TYPE;
extern const int MEMORY_LIMIT_EXCEEDED;
extern const int CANNOT_ALLOCATE_MEMORY;
extern const int CANNOT_MUNMAP;
extern const int CANNOT_MREMAP;
}
bool isNotEnoughMemoryErrorCode(int code)
{
/// Don't count the part as broken if there is not enough memory to load it.
/// In fact, there can be many similar situations.
/// But it is OK, because there is a safety guard against deleting too many parts.
return code == ErrorCodes::MEMORY_LIMIT_EXCEEDED
|| code == ErrorCodes::CANNOT_ALLOCATE_MEMORY
|| code == ErrorCodes::CANNOT_MUNMAP
|| code == ErrorCodes::CANNOT_MREMAP;
}

View File

@ -19,4 +19,7 @@ IMergeTreeDataPart::Checksums checkDataPart(
const MergeTreeDataPartType & part_type,
bool require_checksums,
std::function<bool()> is_cancelled = []{ return false; });
bool isNotEnoughMemoryErrorCode(int code);
}

View File

@ -17,6 +17,7 @@
#include <Common/Macros.h>
#include <Common/escapeForFileName.h>
#include <Common/typeid_cast.h>
#include <Common/quoteString.h>
#include <Parsers/ASTDropQuery.h>
#include <Parsers/ASTExpressionList.h>
@ -73,6 +74,7 @@ namespace ErrorCodes
extern const int NO_SUCH_COLUMN_IN_TABLE;
extern const int TOO_MANY_ROWS;
extern const int UNABLE_TO_SKIP_UNUSED_SHARDS;
extern const int LOGICAL_ERROR;
}
namespace ActionLocks
@ -378,8 +380,77 @@ StoragePtr StorageDistributed::createWithOwnCluster(
}
QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage(const Context &context, QueryProcessingStage::Enum to_stage, const ASTPtr &query_ptr) const
bool StorageDistributed::canForceGroupByNoMerge(const Context &context, QueryProcessingStage::Enum to_stage, const ASTPtr & query_ptr) const
{
const auto & settings = context.getSettingsRef();
std::string reason;
if (settings.distributed_group_by_no_merge)
return true;
/// Distributed-over-Distributed (see getQueryProcessingStageImpl())
if (to_stage == QueryProcessingStage::WithMergeableState)
return false;
if (!settings.optimize_skip_unused_shards)
return false;
if (!has_sharding_key)
return false;
const auto & select = query_ptr->as<ASTSelectQuery &>();
if (select.orderBy())
return false;
if (select.distinct)
{
for (auto & expr : select.select()->children)
{
auto id = expr->as<ASTIdentifier>();
if (!id)
return false;
if (!sharding_key_expr->getSampleBlock().has(id->name))
return false;
}
reason = "DISTINCT " + backQuote(serializeAST(*select.select(), true));
}
// This can use distributed_group_by_no_merge but in this case limit stage
// should be done later (which is not the case right now).
if (select.limitBy() || select.limitLength())
return false;
const ASTPtr group_by = select.groupBy();
if (!group_by)
{
if (!select.distinct)
return false;
}
else
{
// injective functions are optimized out in optimizeGroupBy()
// hence all we need to check is that column in GROUP BY matches sharding expression
auto & group_exprs = group_by->children;
if (group_exprs.empty())
throw Exception("No ASTExpressionList in GROUP BY", ErrorCodes::LOGICAL_ERROR);
auto id = group_exprs[0]->as<ASTIdentifier>();
if (!id)
return false;
if (!sharding_key_expr->getSampleBlock().has(id->name))
return false;
reason = "GROUP BY " + backQuote(serializeAST(*group_by, true));
}
LOG_DEBUG(log, "Force distributed_group_by_no_merge for " << reason << " (injective)");
return true;
}
QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage(const Context &context, QueryProcessingStage::Enum to_stage, const ASTPtr & query_ptr) const
{
if (canForceGroupByNoMerge(context, to_stage, query_ptr))
return QueryProcessingStage::Complete;
auto cluster = getOptimizedCluster(context, query_ptr);
return getQueryProcessingStageImpl(context, to_stage, cluster);
}

View File

@ -67,7 +67,9 @@ public:
bool isRemote() const override { return true; }
QueryProcessingStage::Enum getQueryProcessingStage(const Context &, QueryProcessingStage::Enum /*to_stage*/, const ASTPtr &) const override;
/// Return true if distributed_group_by_no_merge may be applied.
bool canForceGroupByNoMerge(const Context &, QueryProcessingStage::Enum to_stage, const ASTPtr &) const;
QueryProcessingStage::Enum getQueryProcessingStage(const Context &, QueryProcessingStage::Enum to_stage, const ASTPtr &) const override;
Pipes read(
const Names & column_names,

View File

@ -1274,7 +1274,8 @@ bool StorageReplicatedMergeTree::executeFetch(LogEntry & entry)
++total_fetches;
SCOPE_EXIT({--total_fetches;});
if (storage_settings_ptr->replicated_max_parallel_fetches_for_table && current_table_fetches >= storage_settings_ptr->replicated_max_parallel_fetches_for_table)
if (storage_settings_ptr->replicated_max_parallel_fetches_for_table
&& current_table_fetches >= storage_settings_ptr->replicated_max_parallel_fetches_for_table)
{
throw Exception("Too many fetches from replicas for table, maximum: " + storage_settings_ptr->replicated_max_parallel_fetches_for_table.toString(),
ErrorCodes::TOO_MANY_FETCHES);
@ -1416,7 +1417,7 @@ bool StorageReplicatedMergeTree::executeFetch(LogEntry & entry)
}
catch (...)
{
/** If you can not download the part you need for some merge, it's better not to try to get other parts for this merge,
/** If we can not download the part we need for some merge, it's better not to try to get other parts for this merge,
* but try to get already merged part. To do this, move the action to get the remaining parts
* for this merge at the end of the queue.
*/
@ -3306,7 +3307,6 @@ void StorageReplicatedMergeTree::alter(
alter_entry->alter_version = new_metadata_version;
alter_entry->create_time = time(nullptr);
auto maybe_mutation_commands = params.getMutationCommands(current_metadata);
alter_entry->have_mutation = !maybe_mutation_commands.empty();

View File

@ -0,0 +1,4 @@
<?xml version="1.0"?>
<yandex>
<system_tables_lazy_load>false</system_tables_lazy_load>
</yandex>

View File

@ -0,0 +1,24 @@
import time
import pytest
import os
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', config_dir="configs")
@pytest.fixture(scope="module")
def start_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_system_tables_non_lazy_load(start_cluster):
assert node1.query_and_get_error("SELECT * FROM system.part_log") == ""
assert node1.query_and_get_error("SELECT * FROM system.query_log") == ""
assert node1.query_and_get_error("SELECT * FROM system.query_thread_log") == ""
assert node1.query_and_get_error("SELECT * FROM system.text_log") == ""
assert node1.query_and_get_error("SELECT * FROM system.trace_log") == ""
assert node1.query_and_get_error("SELECT * FROM system.metric_log") == ""

View File

@ -23,7 +23,7 @@ with client(name='client1>', log=log) as client1, client(name='client2>', log=lo
client1.send('DROP TABLE IF EXISTS test.lv')
client1.expect(prompt)
client1.send(' DROP TABLE IF EXISTS test.mt')
client1.send('DROP TABLE IF EXISTS test.mt')
client1.expect(prompt)
client1.send('SET live_view_heartbeat_interval=1')
client1.expect(prompt)
@ -31,14 +31,16 @@ with client(name='client1>', log=log) as client1, client(name='client2>', log=lo
client1.expect(prompt)
client1.send('CREATE LIVE VIEW test.lv WITH TIMEOUT AS SELECT sum(a) FROM test.mt')
client1.expect(prompt)
client1.send('WATCH test.lv EVENTS')
client1.send('WATCH test.lv EVENTS FORMAT CSV')
client1.expect('Progress: 1.00 rows.*\)')
client2.send('INSERT INTO test.mt VALUES (1)')
client1.expect('1.*' + end_of_block)
client2.expect(prompt)
client1.expect('Progress: 2.00 rows.*\)')
client2.send('INSERT INTO test.mt VALUES (2),(3)')
client1.expect('[23].*' + end_of_block)
client1.expect('Progress: [23]\.00 rows.*\)')
client2.expect(prompt)
client1.expect('Progress: 3.00 rows.*\)')
# wait for heartbeat
client1.expect('Progress: [23]\.00 rows.*\)')
client1.expect('Progress: 3.00 rows.*\)')
# send Ctrl-C
client1.send('\x03', eol='')
match = client1.expect('(%s)|([#\$] )' % prompt)

View File

@ -6,3 +6,5 @@ optimize_skip_unused_shards
optimize_skip_unused_shards lack of WHERE
0
1
0
1

View File

@ -12,6 +12,7 @@ SELECT DISTINCT id FROM dist_01213 WHERE id = 1 SETTINGS distributed_group_by_no
SELECT 'optimize_skip_unused_shards';
SELECT DISTINCT id FROM dist_01213 WHERE id = 1 SETTINGS optimize_skip_unused_shards=1;
-- check that querying all shards is ok
-- (there will be duplicates, since the INSERT was done via local table)
SELECT 'optimize_skip_unused_shards lack of WHERE';
SELECT DISTINCT id FROM dist_01213 SETTINGS optimize_skip_unused_shards=1;

View File

@ -0,0 +1,69 @@
-
0
1
0
1
optimize_skip_unused_shards
0
1
0
1
GROUP BY number
1 0
1 1
1 0
1 1
GROUP BY number distributed_group_by_no_merge
1 0
1 1
1 0
1 1
GROUP BY number, 1
1 0
1 1
1 0
1 1
GROUP BY 1
4 0
GROUP BY number ORDER BY number DESC
2 1
2 0
GROUP BY toString(number)
1 0
1 1
1 0
1 1
GROUP BY number%2
2 0
2 1
countDistinct
2
countDistinct GROUP BY number
1
1
1
1
DISTINCT
0
1
0
1
HAVING
LIMIT
2 0
2 1
LIMIT BY
2 0
2 1
GROUP BY (Distributed-over-Distributed)
4 0
4 1
GROUP BY (Distributed-over-Distributed) distributed_group_by_no_merge
1 0
1 1
1 0
1 1
1 0
1 1
1 0
1 1

View File

@ -0,0 +1,62 @@
drop table if exists dist_01247;
drop table if exists data_01247;
create table data_01247 as system.numbers engine=Memory();
insert into data_01247 select * from system.numbers limit 2;
create table dist_01247 as data_01247 engine=Distributed(test_cluster_two_shards, currentDatabase(), data_01247, number);
-- since data is not inserted via distributed it will have duplicates
-- (and this is how we ensure that this optimization will work)
set max_distributed_connections=1;
select '-';
select * from dist_01247;
select 'optimize_skip_unused_shards';
set optimize_skip_unused_shards=1;
select * from dist_01247;
select 'GROUP BY number';
select count(), * from dist_01247 group by number;
select 'GROUP BY number distributed_group_by_no_merge';
select count(), * from dist_01247 group by number settings distributed_group_by_no_merge=1;
-- dumb, but should work, since "GROUP BY 1" optimized out
select 'GROUP BY number, 1';
select count(), * from dist_01247 group by number, 1;
select 'GROUP BY 1';
select count(), min(number) from dist_01247 group by 1;
select 'GROUP BY number ORDER BY number DESC';
select count(), * from dist_01247 group by number order by number desc;
select 'GROUP BY toString(number)';
select count(), * from dist_01247 group by toString(number);
select 'GROUP BY number%2';
select count(), any(number) from dist_01247 group by number%2;
select 'countDistinct';
select count(DISTINCT number) from dist_01247;
select 'countDistinct GROUP BY number';
select count(DISTINCT number) from dist_01247 group by number;
select 'DISTINCT';
select DISTINCT number from dist_01247;
select 'HAVING';
select count() cnt, * from dist_01247 group by number having cnt < 0;
select 'LIMIT';
select count(), * from dist_01247 group by number limit 1;
select count(), * from dist_01247 group by number limit 1 offset 1;
select 'LIMIT BY';
select count(), * from dist_01247 group by number limit 0 by number;
select count(), * from dist_01247 group by number limit 1 by number;
select 'GROUP BY (Distributed-over-Distributed)';
select count(), * from cluster(test_cluster_two_shards, currentDatabase(), dist_01247) group by number;
select 'GROUP BY (Distributed-over-Distributed) distributed_group_by_no_merge';
select count(), * from cluster(test_cluster_two_shards, currentDatabase(), dist_01247) group by number settings distributed_group_by_no_merge=1;