Merge remote-tracking branch 'origin/master' into pr-right-joins

This commit is contained in:
Igor Nikonov 2024-10-29 12:18:02 +00:00
commit 05c9ba1215
66 changed files with 622 additions and 58 deletions

View File

@ -25,9 +25,10 @@
// We don't have libc struct available here.
// Compute aux vector manually (from /proc/self/auxv).
//
// Right now there is only 51 AT_* constants,
// so 64 should be enough until this implementation will be replaced with musl.
static unsigned long __auxv_procfs[64];
// Right now there are 51 AT_* constants. Custom kernels have been encountered
// making use of up to 71. 128 should be enough until this implementation is
// replaced with musl.
static unsigned long __auxv_procfs[128];
static unsigned long __auxv_secure = 0;
// Common
static unsigned long * __auxv_environ = NULL;

View File

@ -5,7 +5,7 @@ sidebar_label: JSON
keywords: [json, data type]
---
# JSON
# JSON Data Type
Stores JavaScript Object Notation (JSON) documents in a single column.

View File

@ -185,6 +185,7 @@ Examples:
- `CREATE USER name1 VALID UNTIL '2025-01-01'`
- `CREATE USER name1 VALID UNTIL '2025-01-01 12:00:00 UTC'`
- `CREATE USER name1 VALID UNTIL 'infinity'`
- ```CREATE USER name1 VALID UNTIL '2025-01-01 12:00:00 `Asia/Tokyo`'```
## GRANTEES Clause

View File

@ -168,6 +168,7 @@ namespace ServerSetting
{
extern const ServerSettingsUInt32 asynchronous_heavy_metrics_update_period_s;
extern const ServerSettingsUInt32 asynchronous_metrics_update_period_s;
extern const ServerSettingsBool asynchronous_metrics_enable_heavy_metrics;
extern const ServerSettingsBool async_insert_queue_flush_on_shutdown;
extern const ServerSettingsUInt64 async_insert_threads;
extern const ServerSettingsBool async_load_databases;
@ -1061,6 +1062,7 @@ try
ServerAsynchronousMetrics async_metrics(
global_context,
server_settings[ServerSetting::asynchronous_metrics_update_period_s],
server_settings[ServerSetting::asynchronous_metrics_enable_heavy_metrics],
server_settings[ServerSetting::asynchronous_heavy_metrics_update_period_s],
[&]() -> std::vector<ProtocolServerMetrics>
{

View File

@ -159,6 +159,7 @@ enum class AccessType : uint8_t
M(SYSTEM_SHUTDOWN, "SYSTEM KILL, SHUTDOWN", GLOBAL, SYSTEM) \
M(SYSTEM_DROP_DNS_CACHE, "SYSTEM DROP DNS, DROP DNS CACHE, DROP DNS", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_CONNECTIONS_CACHE, "SYSTEM DROP CONNECTIONS CACHE, DROP CONNECTIONS CACHE", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_PREWARM_MARK_CACHE, "SYSTEM PREWARM MARK, PREWARM MARK CACHE, PREWARM MARKS", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_MARK_CACHE, "SYSTEM DROP MARK, DROP MARK CACHE, DROP MARKS", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_UNCOMPRESSED_CACHE, "SYSTEM DROP UNCOMPRESSED, DROP UNCOMPRESSED CACHE, DROP UNCOMPRESSED", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_MMAP_CACHE, "SYSTEM DROP MMAP, DROP MMAP CACHE, DROP MMAP", GLOBAL, SYSTEM_DROP_CACHE) \

View File

@ -58,6 +58,7 @@ namespace DB
DECLARE(Double, cannot_allocate_thread_fault_injection_probability, 0, "For testing purposes.", 0) \
DECLARE(Int32, max_connections, 1024, "Max server connections.", 0) \
DECLARE(UInt32, asynchronous_metrics_update_period_s, 1, "Period in seconds for updating asynchronous metrics.", 0) \
DECLARE(Bool, asynchronous_metrics_enable_heavy_metrics, false, "Enable the calculation of heavy asynchronous metrics.", 0) \
DECLARE(UInt32, asynchronous_heavy_metrics_update_period_s, 120, "Period in seconds for updating heavy asynchronous metrics.", 0) \
DECLARE(String, default_database, "default", "Default database name.", 0) \
DECLARE(String, tmp_policy, "", "Policy for storage with temporary data.", 0) \
@ -99,6 +100,7 @@ namespace DB
DECLARE(String, mark_cache_policy, DEFAULT_MARK_CACHE_POLICY, "Mark cache policy name.", 0) \
DECLARE(UInt64, mark_cache_size, DEFAULT_MARK_CACHE_MAX_SIZE, "Size of cache for marks (index of MergeTree family of tables).", 0) \
DECLARE(Double, mark_cache_size_ratio, DEFAULT_MARK_CACHE_SIZE_RATIO, "The size of the protected queue in the mark cache relative to the cache's total size.", 0) \
DECLARE(Double, mark_cache_prewarm_ratio, 0.95, "The ratio of total size of mark cache to fill during prewarm.", 0) \
DECLARE(String, index_uncompressed_cache_policy, DEFAULT_INDEX_UNCOMPRESSED_CACHE_POLICY, "Secondary index uncompressed cache policy name.", 0) \
DECLARE(UInt64, index_uncompressed_cache_size, DEFAULT_INDEX_UNCOMPRESSED_CACHE_MAX_SIZE, "Size of cache for uncompressed blocks of secondary indices. Zero means disabled.", 0) \
DECLARE(Double, index_uncompressed_cache_size_ratio, DEFAULT_INDEX_UNCOMPRESSED_CACHE_SIZE_RATIO, "The size of the protected queue in the secondary index uncompressed cache relative to the cache's total size.", 0) \

View File

@ -108,7 +108,7 @@ static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory
{"input_format_orc_dictionary_as_low_cardinality", false, true, "Treat ORC dictionary encoded columns as LowCardinality columns while reading ORC files"},
{"allow_experimental_refreshable_materialized_view", false, true, "Not experimental anymore"},
{"max_parts_to_move", 0, 1000, "New setting"},
{"hnsw_candidate_list_size_for_search", 0, 0, "New setting"},
{"hnsw_candidate_list_size_for_search", 64, 256, "New setting. Previously, the value was optionally specified in CREATE INDEX and 64 by default."},
{"allow_reorder_prewhere_conditions", false, true, "New setting"},
{"input_format_parquet_bloom_filter_push_down", false, true, "When reading Parquet files, skip whole row groups based on the WHERE/PREWHERE expressions and bloom filter in the Parquet metadata."},
{"date_time_64_output_format_cut_trailing_zeros_align_to_groups_of_thousands", false, false, "Dynamically trim the trailing zeros of datetime64 values to adjust the output scale to (0, 3, 6), corresponding to 'seconds', 'milliseconds', and 'microseconds'."}

View File

@ -119,4 +119,6 @@ private:
std::tuple<const BlockInfo *, size_t> lookUpMark(size_t idx) const;
};
using PlainMarksByName = std::unordered_map<String, std::unique_ptr<MarksInCompressedFile::PlainArray>>;
}

View File

@ -89,6 +89,9 @@ namespace CurrentMetrics
extern const Metric RestartReplicaThreads;
extern const Metric RestartReplicaThreadsActive;
extern const Metric RestartReplicaThreadsScheduled;
extern const Metric MergeTreePartsLoaderThreads;
extern const Metric MergeTreePartsLoaderThreadsActive;
extern const Metric MergeTreePartsLoaderThreadsScheduled;
}
namespace DB
@ -97,6 +100,7 @@ namespace Setting
{
extern const SettingsSeconds lock_acquire_timeout;
extern const SettingsSeconds receive_timeout;
extern const SettingsMaxThreads max_threads;
}
namespace ServerSetting
@ -359,6 +363,11 @@ BlockIO InterpreterSystemQuery::execute()
HTTPConnectionPools::instance().dropCache();
break;
}
case Type::PREWARM_MARK_CACHE:
{
prewarmMarkCache();
break;
}
case Type::DROP_MARK_CACHE:
getContext()->checkAccess(AccessType::SYSTEM_DROP_MARK_CACHE);
system_context->clearMarkCache();
@ -1298,6 +1307,28 @@ RefreshTaskList InterpreterSystemQuery::getRefreshTasks()
return tasks;
}
void InterpreterSystemQuery::prewarmMarkCache()
{
if (table_id.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Table is not specified for prewarming marks cache");
getContext()->checkAccess(AccessType::SYSTEM_PREWARM_MARK_CACHE, table_id);
auto table_ptr = DatabaseCatalog::instance().getTable(table_id, getContext());
auto * merge_tree = dynamic_cast<MergeTreeData *>(table_ptr.get());
if (!merge_tree)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Command PREWARM MARK CACHE is supported only for MergeTree table, but got: {}", table_ptr->getName());
ThreadPool pool(
CurrentMetrics::MergeTreePartsLoaderThreads,
CurrentMetrics::MergeTreePartsLoaderThreadsActive,
CurrentMetrics::MergeTreePartsLoaderThreadsScheduled,
getContext()->getSettingsRef()[Setting::max_threads]);
merge_tree->prewarmMarkCache(pool);
}
AccessRightsElements InterpreterSystemQuery::getRequiredAccessForDDLOnCluster() const
{
@ -1499,6 +1530,11 @@ AccessRightsElements InterpreterSystemQuery::getRequiredAccessForDDLOnCluster()
required_access.emplace_back(AccessType::SYSTEM_WAIT_LOADING_PARTS, query.getDatabase(), query.getTable());
break;
}
case Type::PREWARM_MARK_CACHE:
{
required_access.emplace_back(AccessType::SYSTEM_PREWARM_MARK_CACHE, query.getDatabase(), query.getTable());
break;
}
case Type::SYNC_DATABASE_REPLICA:
{
required_access.emplace_back(AccessType::SYSTEM_SYNC_DATABASE_REPLICA, query.getDatabase());

View File

@ -82,6 +82,7 @@ private:
AccessRightsElements getRequiredAccessForDDLOnCluster() const;
void startStopAction(StorageActionBlockType action_type, bool start);
void prewarmMarkCache();
void stopReplicatedDDLQueries();
void startReplicatedDDLQueries();

View File

@ -54,12 +54,14 @@ void calculateMaxAndSum(Max & max, Sum & sum, T x)
ServerAsynchronousMetrics::ServerAsynchronousMetrics(
ContextPtr global_context_,
unsigned update_period_seconds,
bool update_heavy_metrics_,
unsigned heavy_metrics_update_period_seconds,
const ProtocolServerMetricsFunc & protocol_server_metrics_func_,
bool update_jemalloc_epoch_,
bool update_rss_)
: WithContext(global_context_)
, AsynchronousMetrics(update_period_seconds, protocol_server_metrics_func_, update_jemalloc_epoch_, update_rss_)
, update_heavy_metrics(update_heavy_metrics_)
, heavy_metric_update_period(heavy_metrics_update_period_seconds)
{
/// sanity check
@ -412,7 +414,8 @@ void ServerAsynchronousMetrics::updateImpl(TimePoint update_time, TimePoint curr
}
#endif
updateHeavyMetricsIfNeeded(current_time, update_time, force_update, first_run, new_values);
if (update_heavy_metrics)
updateHeavyMetricsIfNeeded(current_time, update_time, force_update, first_run, new_values);
}
void ServerAsynchronousMetrics::logImpl(AsynchronousMetricValues & new_values)
@ -459,10 +462,10 @@ void ServerAsynchronousMetrics::updateDetachedPartsStats()
void ServerAsynchronousMetrics::updateHeavyMetricsIfNeeded(TimePoint current_time, TimePoint update_time, bool force_update, bool first_run, AsynchronousMetricValues & new_values)
{
const auto time_since_previous_update = current_time - heavy_metric_previous_update_time;
const bool update_heavy_metrics = (time_since_previous_update >= heavy_metric_update_period) || force_update || first_run;
const bool need_update_heavy_metrics = (time_since_previous_update >= heavy_metric_update_period) || force_update || first_run;
Stopwatch watch;
if (update_heavy_metrics)
if (need_update_heavy_metrics)
{
heavy_metric_previous_update_time = update_time;
if (first_run)

View File

@ -13,6 +13,7 @@ public:
ServerAsynchronousMetrics(
ContextPtr global_context_,
unsigned update_period_seconds,
bool update_heavy_metrics_,
unsigned heavy_metrics_update_period_seconds,
const ProtocolServerMetricsFunc & protocol_server_metrics_func_,
bool update_jemalloc_epoch_,
@ -24,6 +25,7 @@ private:
void updateImpl(TimePoint update_time, TimePoint current_time, bool force_update, bool first_run, AsynchronousMetricValues & new_values) override;
void logImpl(AsynchronousMetricValues & new_values) override;
bool update_heavy_metrics;
const Duration heavy_metric_update_period;
TimePoint heavy_metric_previous_update_time;
double heavy_update_interval = 0.;

View File

@ -298,9 +298,6 @@ SystemLogs::SystemLogs(ContextPtr global_context, const Poco::Util::AbstractConf
#undef CREATE_PUBLIC_MEMBERS
/// NOLINTEND(bugprone-macro-parentheses)
if (session_log)
global_context->addWarningMessage("Table system.session_log is enabled. It's unreliable and may contain garbage. Do not use it for any kind of security monitoring.");
bool should_prepare = global_context->getServerSettings()[ServerSetting::prepare_system_log_tables_on_startup];
try
{

View File

@ -191,6 +191,7 @@ void ASTSystemQuery::formatImpl(const FormatSettings & settings, FormatState & s
case Type::SYNC_REPLICA:
case Type::WAIT_LOADING_PARTS:
case Type::FLUSH_DISTRIBUTED:
case Type::PREWARM_MARK_CACHE:
{
if (table)
{

View File

@ -23,6 +23,7 @@ public:
SUSPEND,
DROP_DNS_CACHE,
DROP_CONNECTIONS_CACHE,
PREWARM_MARK_CACHE,
DROP_MARK_CACHE,
DROP_UNCOMPRESSED_CACHE,
DROP_INDEX_MARK_CACHE,

View File

@ -276,6 +276,7 @@ bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected &
case Type::RESTART_REPLICA:
case Type::SYNC_REPLICA:
case Type::WAIT_LOADING_PARTS:
case Type::PREWARM_MARK_CACHE:
{
if (!parseQueryWithOnCluster(res, pos, expected))
return false;

View File

@ -6,11 +6,24 @@
#include <Parsers/CommonParsers.h>
#include <Parsers/ExpressionListParsers.h>
#include <Common/typeid_cast.h>
#include <Core/Settings.h>
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_PARSE_TEXT;
}
namespace Setting
{
extern const SettingsUInt64 max_query_size;
extern const SettingsUInt64 max_parser_depth;
extern const SettingsUInt64 max_parser_backtracks;
}
bool parseIdentifierOrStringLiteral(IParser::Pos & pos, Expected & expected, String & result)
{
return IParserBase::wrapParseImpl(pos, [&]
@ -54,4 +67,18 @@ bool parseIdentifiersOrStringLiterals(IParser::Pos & pos, Expected & expected, S
return true;
}
std::vector<String> parseIdentifiersOrStringLiterals(const String & str, const Settings & settings)
{
Tokens tokens(str.data(), str.data() + str.size(), settings[Setting::max_query_size]);
IParser::Pos pos(tokens, static_cast<unsigned>(settings[Setting::max_parser_depth]), static_cast<unsigned>(settings[Setting::max_parser_backtracks]));
Expected expected;
std::vector<String> res;
if (!parseIdentifiersOrStringLiterals(pos, expected, res))
throw Exception(ErrorCodes::CANNOT_PARSE_TEXT, "Cannot parse string ('{}') into vector of identifiers", str);
return res;
}
}

View File

@ -7,6 +7,8 @@
namespace DB
{
struct Settings;
/** Parses a name of an object which could be written in the following forms:
* name / `name` / "name" (identifier) or 'name'.
* Note that empty strings are not allowed.
@ -16,4 +18,7 @@ bool parseIdentifierOrStringLiteral(IParser::Pos & pos, Expected & expected, Str
/** Parse a list of identifiers or string literals. */
bool parseIdentifiersOrStringLiterals(IParser::Pos & pos, Expected & expected, Strings & result);
/** Parse a list of identifiers or string literals into vector of strings. */
std::vector<String> parseIdentifiersOrStringLiterals(const String & str, const Settings & settings);
}

View File

@ -180,6 +180,9 @@ public:
void loadRowsCountFileForUnexpectedPart();
/// Loads marks and saves them into mark cache for specified columns.
virtual void loadMarksToCache(const Names & column_names, MarkCache * mark_cache) const = 0;
String getMarksFileExtension() const { return index_granularity_info.mark_type.getFileExtension(); }
/// Generate the new name for this part according to `new_part_info` and min/max dates from the old name.

View File

@ -91,6 +91,13 @@ Columns IMergeTreeDataPartWriter::releaseIndexColumns()
return result;
}
PlainMarksByName IMergeTreeDataPartWriter::releaseCachedMarks()
{
PlainMarksByName res;
std::swap(cached_marks, res);
return res;
}
SerializationPtr IMergeTreeDataPartWriter::getSerialization(const String & column_name) const
{
auto it = serializations.find(column_name);

View File

@ -8,6 +8,7 @@
#include <Storages/MergeTree/MergeTreeIndices.h>
#include <Storages/Statistics/Statistics.h>
#include <Storages/VirtualColumnsDescription.h>
#include <Formats/MarkInCompressedFile.h>
namespace DB
@ -46,6 +47,9 @@ public:
virtual void finish(bool sync) = 0;
Columns releaseIndexColumns();
PlainMarksByName releaseCachedMarks();
const MergeTreeIndexGranularity & getIndexGranularity() const { return index_granularity; }
protected:
@ -69,6 +73,8 @@ protected:
MutableDataPartStoragePtr data_part_storage;
MutableColumns index_columns;
MergeTreeIndexGranularity index_granularity;
/// Marks that will be saved to cache on finish.
PlainMarksByName cached_marks;
};
using MergeTreeDataPartWriterPtr = std::unique_ptr<IMergeTreeDataPartWriter>;

View File

@ -34,6 +34,11 @@ public:
return writer->getIndexGranularity();
}
PlainMarksByName releaseCachedMarks()
{
return writer->releaseCachedMarks();
}
protected:
/// Remove all columns marked expired in data_part. Also, clears checksums

View File

@ -371,6 +371,7 @@ ReplicatedMergeMutateTaskBase::PrepareResult MergeFromLogEntryTask::prepare()
bool MergeFromLogEntryTask::finalize(ReplicatedMergeMutateTaskBase::PartLogWriter write_part_log)
{
part = merge_task->getFuture().get();
auto cached_marks = merge_task->releaseCachedMarks();
storage.merger_mutator.renameMergedTemporaryPart(part, parts, NO_TRANSACTION_PTR, *transaction_ptr);
/// Why we reset task here? Because it holds shared pointer to part and tryRemovePartImmediately will
@ -444,6 +445,9 @@ bool MergeFromLogEntryTask::finalize(ReplicatedMergeMutateTaskBase::PartLogWrite
finish_callback = [storage_ptr = &storage]() { storage_ptr->merge_selecting_task->schedule(); };
ProfileEvents::increment(ProfileEvents::ReplicatedPartMerges);
if (auto * mark_cache = storage.getContext()->getMarkCache().get())
addMarksToCache(*part, cached_marks, mark_cache);
write_part_log({});
StorageReplicatedMergeTree::incrementMergedPartsProfileEvent(part->getType());

View File

@ -152,6 +152,12 @@ void MergePlainMergeTreeTask::finish()
ThreadFuzzer::maybeInjectSleep();
ThreadFuzzer::maybeInjectMemoryLimitException();
if (auto * mark_cache = storage.getContext()->getMarkCache().get())
{
auto marks = merge_task->releaseCachedMarks();
addMarksToCache(*new_part, marks, mark_cache);
}
write_part_log({});
StorageMergeTree::incrementMergedPartsProfileEvent(new_part->getType());
transfer_profile_counters_to_initial_query();
@ -163,7 +169,6 @@ void MergePlainMergeTreeTask::finish()
ThreadFuzzer::maybeInjectSleep();
ThreadFuzzer::maybeInjectMemoryLimitException();
}
}
ContextMutablePtr MergePlainMergeTreeTask::createTaskContext() const

View File

@ -93,6 +93,7 @@ namespace MergeTreeSetting
extern const MergeTreeSettingsUInt64 vertical_merge_algorithm_min_columns_to_activate;
extern const MergeTreeSettingsUInt64 vertical_merge_algorithm_min_rows_to_activate;
extern const MergeTreeSettingsBool vertical_merge_remote_filesystem_prefetch;
extern const MergeTreeSettingsBool prewarm_mark_cache;
}
namespace ErrorCodes
@ -546,6 +547,8 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare() const
}
}
bool save_marks_in_cache = (*global_ctx->data->getSettings())[MergeTreeSetting::prewarm_mark_cache] && global_ctx->context->getMarkCache();
global_ctx->to = std::make_shared<MergedBlockOutputStream>(
global_ctx->new_data_part,
global_ctx->metadata_snapshot,
@ -555,6 +558,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare() const
ctx->compression_codec,
global_ctx->txn ? global_ctx->txn->tid : Tx::PrehistoricTID,
/*reset_columns=*/ true,
save_marks_in_cache,
ctx->blocks_are_granules_size,
global_ctx->context->getWriteSettings());
@ -1085,6 +1089,8 @@ void MergeTask::VerticalMergeStage::prepareVerticalMergeForOneColumn() const
ctx->executor = std::make_unique<PullingPipelineExecutor>(ctx->column_parts_pipeline);
NamesAndTypesList columns_list = {*ctx->it_name_and_type};
bool save_marks_in_cache = (*global_ctx->data->getSettings())[MergeTreeSetting::prewarm_mark_cache] && global_ctx->context->getMarkCache();
ctx->column_to = std::make_unique<MergedColumnOnlyOutputStream>(
global_ctx->new_data_part,
global_ctx->metadata_snapshot,
@ -1093,6 +1099,7 @@ void MergeTask::VerticalMergeStage::prepareVerticalMergeForOneColumn() const
column_pipepline.indexes_to_recalc,
getStatisticsForColumns(columns_list, global_ctx->metadata_snapshot),
&global_ctx->written_offset_columns,
save_marks_in_cache,
global_ctx->to->getIndexGranularity());
ctx->column_elems_written = 0;
@ -1130,6 +1137,10 @@ void MergeTask::VerticalMergeStage::finalizeVerticalMergeForOneColumn() const
auto changed_checksums = ctx->column_to->fillChecksums(global_ctx->new_data_part, global_ctx->checksums_gathered_columns);
global_ctx->checksums_gathered_columns.add(std::move(changed_checksums));
auto cached_marks = ctx->column_to->releaseCachedMarks();
for (auto & [name, marks] : cached_marks)
global_ctx->cached_marks.emplace(name, std::move(marks));
ctx->delayed_streams.emplace_back(std::move(ctx->column_to));
while (ctx->delayed_streams.size() > ctx->max_delayed_streams)
@ -1276,6 +1287,10 @@ bool MergeTask::MergeProjectionsStage::finalizeProjectionsAndWholeMerge() const
else
global_ctx->to->finalizePart(global_ctx->new_data_part, ctx->need_sync, &global_ctx->storage_columns, &global_ctx->checksums_gathered_columns);
auto cached_marks = global_ctx->to->releaseCachedMarks();
for (auto & [name, marks] : cached_marks)
global_ctx->cached_marks.emplace(name, std::move(marks));
global_ctx->new_data_part->getDataPartStorage().precommitTransaction();
global_ctx->promise.set_value(global_ctx->new_data_part);

View File

@ -5,6 +5,7 @@
#include <Common/ProfileEvents.h>
#include <Common/filesystemHelpers.h>
#include <Formats/MarkInCompressedFile.h>
#include <Compression/CompressedReadBuffer.h>
#include <Compression/CompressedReadBufferFromFile.h>
@ -132,6 +133,13 @@ public:
return nullptr;
}
PlainMarksByName releaseCachedMarks() const
{
PlainMarksByName res;
std::swap(global_ctx->cached_marks, res);
return res;
}
bool execute();
private:
@ -209,6 +217,7 @@ private:
std::promise<MergeTreeData::MutableDataPartPtr> promise{};
IMergedBlockOutputStream::WrittenOffsetColumns written_offset_columns{};
PlainMarksByName cached_marks;
MergeTreeTransactionPtr txn;
bool need_prefix;

View File

@ -22,6 +22,7 @@
#include <Common/scope_guard_safe.h>
#include <Common/typeid_cast.h>
#include <Core/Settings.h>
#include <Core/ServerSettings.h>
#include <Storages/MergeTree/RangesInDataPart.h>
#include <Compression/CompressedReadBuffer.h>
#include <Core/QueryProcessingStage.h>
@ -154,6 +155,7 @@ namespace
namespace DB
{
namespace Setting
{
extern const SettingsBool allow_drop_detached;
@ -229,6 +231,12 @@ namespace MergeTreeSetting
extern const MergeTreeSettingsString storage_policy;
extern const MergeTreeSettingsFloat zero_copy_concurrent_part_removal_max_postpone_ratio;
extern const MergeTreeSettingsUInt64 zero_copy_concurrent_part_removal_max_split_times;
extern const MergeTreeSettingsBool prewarm_mark_cache;
}
namespace ServerSetting
{
extern const ServerSettingsDouble mark_cache_prewarm_ratio;
}
namespace ErrorCodes
@ -2335,6 +2343,55 @@ void MergeTreeData::stopOutdatedAndUnexpectedDataPartsLoadingTask()
}
}
void MergeTreeData::prewarmMarkCache(ThreadPool & pool)
{
if (!(*getSettings())[MergeTreeSetting::prewarm_mark_cache])
return;
auto * mark_cache = getContext()->getMarkCache().get();
if (!mark_cache)
return;
auto metadata_snaphost = getInMemoryMetadataPtr();
auto column_names = getColumnsToPrewarmMarks(*getSettings(), metadata_snaphost->getColumns().getAllPhysical());
if (column_names.empty())
return;
Stopwatch watch;
LOG_TRACE(log, "Prewarming mark cache");
auto data_parts = getDataPartsVectorForInternalUsage();
/// Prewarm mark cache firstly for the most fresh parts according
/// to time columns in partition key (if exists) and by modification time.
auto to_tuple = [](const auto & part)
{
return std::make_tuple(part->getMinMaxDate().second, part->getMinMaxTime().second, part->modification_time);
};
std::sort(data_parts.begin(), data_parts.end(), [&to_tuple](const auto & lhs, const auto & rhs)
{
return to_tuple(lhs) > to_tuple(rhs);
});
ThreadPoolCallbackRunnerLocal<void> runner(pool, "PrewarmMarks");
double ratio_to_prewarm = getContext()->getServerSettings()[ServerSetting::mark_cache_prewarm_ratio];
for (const auto & part : data_parts)
{
if (mark_cache->sizeInBytes() >= mark_cache->maxSizeInBytes() * ratio_to_prewarm)
break;
runner([&] { part->loadMarksToCache(column_names, mark_cache); });
}
runner.waitForAllToFinishAndRethrowFirstError();
watch.stop();
LOG_TRACE(log, "Prewarmed mark cache in {} seconds", watch.elapsedSeconds());
}
/// Is the part directory old.
/// True if its modification time and the modification time of all files inside it is less then threshold.
/// (Only files on the first level of nesting are considered).
@ -6371,6 +6428,12 @@ DetachedPartsInfo MergeTreeData::getDetachedParts() const
for (const auto & disk : getDisks())
{
/// While it is possible to have detached parts on readonly/write-once disks
/// (if they were produced on another machine, where it wasn't readonly)
/// to avoid wasting resources for slow disks, avoid trying to enumerate them.
if (disk->isReadOnly() || disk->isWriteOnce())
continue;
String detached_path = fs::path(relative_data_path) / DETACHED_DIR_NAME;
/// Note: we don't care about TOCTOU issue here.

View File

@ -506,6 +506,9 @@ public:
/// Load the set of data parts from disk. Call once - immediately after the object is created.
void loadDataParts(bool skip_sanity_checks, std::optional<std::unordered_set<std::string>> expected_parts);
/// Prewarm mark cache for the most recent data parts.
void prewarmMarkCache(ThreadPool & pool);
String getLogName() const { return log.loadName(); }
Int64 getMaxBlockNumber() const;

View File

@ -136,6 +136,32 @@ void MergeTreeDataPartCompact::loadIndexGranularity()
loadIndexGranularityImpl(index_granularity, index_granularity_info, columns.size(), getDataPartStorage());
}
void MergeTreeDataPartCompact::loadMarksToCache(const Names & column_names, MarkCache * mark_cache) const
{
if (column_names.empty() || !mark_cache)
return;
auto context = storage.getContext();
auto read_settings = context->getReadSettings();
auto * load_marks_threadpool = read_settings.load_marks_asynchronously ? &context->getLoadMarksThreadpool() : nullptr;
auto info_for_read = std::make_shared<LoadedMergeTreeDataPartInfoForReader>(shared_from_this(), std::make_shared<AlterConversions>());
LOG_TEST(getLogger("MergeTreeDataPartCompact"), "Loading marks into mark cache for columns {} of part {}", toString(column_names), name);
MergeTreeMarksLoader loader(
info_for_read,
mark_cache,
index_granularity_info.getMarksFilePath(DATA_FILE_NAME),
index_granularity.getMarksCount(),
index_granularity_info,
/*save_marks_in_cache=*/ true,
read_settings,
load_marks_threadpool,
columns.size());
loader.loadMarks();
}
bool MergeTreeDataPartCompact::hasColumnFiles(const NameAndTypePair & column) const
{
if (!getColumnPosition(column.getNameInStorage()))

View File

@ -54,6 +54,8 @@ public:
std::optional<String> getFileNameForColumn(const NameAndTypePair & /* column */) const override { return DATA_FILE_NAME; }
void loadMarksToCache(const Names & column_names, MarkCache * mark_cache) const override;
~MergeTreeDataPartCompact() override;
protected:

View File

@ -182,6 +182,47 @@ void MergeTreeDataPartWide::loadIndexGranularity()
loadIndexGranularityImpl(index_granularity, index_granularity_info, getDataPartStorage(), *any_column_filename);
}
void MergeTreeDataPartWide::loadMarksToCache(const Names & column_names, MarkCache * mark_cache) const
{
if (column_names.empty() || !mark_cache)
return;
std::vector<std::unique_ptr<MergeTreeMarksLoader>> loaders;
auto context = storage.getContext();
auto read_settings = context->getReadSettings();
auto * load_marks_threadpool = read_settings.load_marks_asynchronously ? &context->getLoadMarksThreadpool() : nullptr;
auto info_for_read = std::make_shared<LoadedMergeTreeDataPartInfoForReader>(shared_from_this(), std::make_shared<AlterConversions>());
LOG_TEST(getLogger("MergeTreeDataPartWide"), "Loading marks into mark cache for columns {} of part {}", toString(column_names), name);
for (const auto & column_name : column_names)
{
auto serialization = getSerialization(column_name);
serialization->enumerateStreams([&](const auto & subpath)
{
auto stream_name = getStreamNameForColumn(column_name, subpath, checksums);
if (!stream_name)
return;
loaders.emplace_back(std::make_unique<MergeTreeMarksLoader>(
info_for_read,
mark_cache,
index_granularity_info.getMarksFilePath(*stream_name),
index_granularity.getMarksCount(),
index_granularity_info,
/*save_marks_in_cache=*/ true,
read_settings,
load_marks_threadpool,
/*num_columns_in_mark=*/ 1));
loaders.back()->startAsyncLoad();
});
}
for (auto & loader : loaders)
loader->loadMarks();
}
bool MergeTreeDataPartWide::isStoredOnRemoteDisk() const
{

View File

@ -51,6 +51,8 @@ public:
std::optional<time_t> getColumnModificationTime(const String & column_name) const override;
void loadMarksToCache(const Names & column_names, MarkCache * mark_cache) const override;
protected:
static void loadIndexGranularityImpl(
MergeTreeIndexGranularity & index_granularity_, MergeTreeIndexGranularityInfo & index_granularity_info_,

View File

@ -1,5 +1,6 @@
#include <Storages/MergeTree/MergeTreeDataPartWriterCompact.h>
#include <Storages/MergeTree/MergeTreeDataPartCompact.h>
#include "Formats/MarkInCompressedFile.h"
namespace DB
{
@ -54,6 +55,11 @@ MergeTreeDataPartWriterCompact::MergeTreeDataPartWriterCompact(
marks_source_hashing = std::make_unique<HashingWriteBuffer>(*marks_compressor);
}
if (settings.save_marks_in_cache)
{
cached_marks[MergeTreeDataPartCompact::DATA_FILE_NAME] = std::make_unique<MarksInCompressedFile::PlainArray>();
}
for (const auto & column : columns_list)
{
auto compression = getCodecDescOrDefault(column.name, default_codec);
@ -255,9 +261,12 @@ void MergeTreeDataPartWriterCompact::writeDataBlock(const Block & block, const G
return &result_stream->hashing_buf;
};
MarkInCompressedFile mark{plain_hashing.count(), static_cast<UInt64>(0)};
writeBinaryLittleEndian(mark.offset_in_compressed_file, marks_out);
writeBinaryLittleEndian(mark.offset_in_decompressed_block, marks_out);
writeBinaryLittleEndian(plain_hashing.count(), marks_out);
writeBinaryLittleEndian(static_cast<UInt64>(0), marks_out);
if (!cached_marks.empty())
cached_marks.begin()->second->push_back(mark);
writeColumnSingleGranule(
block.getByName(name_and_type->name), getSerialization(name_and_type->name),
@ -296,11 +305,17 @@ void MergeTreeDataPartWriterCompact::fillDataChecksums(MergeTreeDataPartChecksum
if (with_final_mark && data_written)
{
MarkInCompressedFile mark{plain_hashing.count(), 0};
for (size_t i = 0; i < columns_list.size(); ++i)
{
writeBinaryLittleEndian(plain_hashing.count(), marks_out);
writeBinaryLittleEndian(static_cast<UInt64>(0), marks_out);
writeBinaryLittleEndian(mark.offset_in_compressed_file, marks_out);
writeBinaryLittleEndian(mark.offset_in_decompressed_block, marks_out);
if (!cached_marks.empty())
cached_marks.begin()->second->push_back(mark);
}
writeBinaryLittleEndian(static_cast<UInt64>(0), marks_out);
}

View File

@ -8,6 +8,7 @@
#include <Parsers/ExpressionElementParsers.h>
#include <Parsers/parseQuery.h>
#include <Storages/Statistics/Statistics.h>
#include <Storages/MarkCache.h>
namespace DB
{

View File

@ -6,6 +6,8 @@
#include <Common/escapeForFileName.h>
#include <Columns/ColumnSparse.h>
#include <Common/logger_useful.h>
#include <Storages/MergeTree/MergeTreeMarksLoader.h>
#include <Storages/MarkCache.h>
#include <Storages/ColumnsDescription.h>
#include <Storages/MergeTree/MergeTreeSettings.h>
@ -105,6 +107,12 @@ MergeTreeDataPartWriterWide::MergeTreeDataPartWriterWide(
indices_to_recalc_, stats_to_recalc_, marks_file_extension_,
default_codec_, settings_, index_granularity_)
{
if (settings.save_marks_in_cache)
{
auto columns_vec = getColumnsToPrewarmMarks(*storage_settings, columns_list);
columns_to_load_marks = NameSet(columns_vec.begin(), columns_vec.end());
}
for (const auto & column : columns_list)
{
auto compression = getCodecDescOrDefault(column.name, default_codec);
@ -198,6 +206,9 @@ void MergeTreeDataPartWriterWide::addStreams(
settings.marks_compress_block_size,
query_write_settings);
if (columns_to_load_marks.contains(name_and_type.name))
cached_marks.emplace(stream_name, std::make_unique<MarksInCompressedFile::PlainArray>());
full_name_to_stream_name.emplace(full_stream_name, stream_name);
stream_name_to_full_name.emplace(stream_name, full_stream_name);
};
@ -366,8 +377,12 @@ void MergeTreeDataPartWriterWide::flushMarkToFile(const StreamNameAndMark & stre
writeBinaryLittleEndian(stream_with_mark.mark.offset_in_compressed_file, marks_out);
writeBinaryLittleEndian(stream_with_mark.mark.offset_in_decompressed_block, marks_out);
if (settings.can_use_adaptive_granularity)
writeBinaryLittleEndian(rows_in_mark, marks_out);
if (auto it = cached_marks.find(stream_with_mark.stream_name); it != cached_marks.end())
it->second->push_back(stream_with_mark.mark);
}
StreamsWithMarks MergeTreeDataPartWriterWide::getCurrentMarksForColumn(
@ -742,7 +757,6 @@ void MergeTreeDataPartWriterWide::fillChecksums(MergeTreeDataPartChecksums & che
fillPrimaryIndexChecksums(checksums);
fillSkipIndicesChecksums(checksums);
fillStatisticsChecksums(checksums);
}
@ -756,7 +770,6 @@ void MergeTreeDataPartWriterWide::finish(bool sync)
finishPrimaryIndexSerialization(sync);
finishSkipIndicesSerialization(sync);
finishStatisticsSerialization(sync);
}

View File

@ -136,6 +136,9 @@ private:
using MarksForColumns = std::unordered_map<String, StreamsWithMarks>;
MarksForColumns last_non_written_marks;
/// Set of columns to put marks in cache during write.
NameSet columns_to_load_marks;
/// How many rows we have already written in the current mark.
/// More than zero when incoming blocks are smaller then their granularity.
size_t rows_written_in_last_mark = 0;

View File

@ -71,10 +71,7 @@ namespace Setting
extern const SettingsString force_data_skipping_indices;
extern const SettingsBool force_index_by_date;
extern const SettingsSeconds lock_acquire_timeout;
extern const SettingsUInt64 max_parser_backtracks;
extern const SettingsUInt64 max_parser_depth;
extern const SettingsInt64 max_partitions_to_read;
extern const SettingsUInt64 max_query_size;
extern const SettingsUInt64 max_threads_for_indexes;
extern const SettingsNonZeroUInt64 max_parallel_replicas;
extern const SettingsUInt64 merge_tree_coarse_index_granularity;
@ -640,20 +637,11 @@ RangesInDataParts MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipInd
if (use_skip_indexes && settings[Setting::force_data_skipping_indices].changed)
{
const auto & indices = settings[Setting::force_data_skipping_indices].toString();
Strings forced_indices;
{
Tokens tokens(indices.data(), indices.data() + indices.size(), settings[Setting::max_query_size]);
IParser::Pos pos(
tokens, static_cast<unsigned>(settings[Setting::max_parser_depth]), static_cast<unsigned>(settings[Setting::max_parser_backtracks]));
Expected expected;
if (!parseIdentifiersOrStringLiterals(pos, expected, forced_indices))
throw Exception(ErrorCodes::CANNOT_PARSE_TEXT, "Cannot parse force_data_skipping_indices ('{}')", indices);
}
const auto & indices_str = settings[Setting::force_data_skipping_indices].toString();
auto forced_indices = parseIdentifiersOrStringLiterals(indices_str, settings);
if (forced_indices.empty())
throw Exception(ErrorCodes::CANNOT_PARSE_TEXT, "No indices parsed from force_data_skipping_indices ('{}')", indices);
throw Exception(ErrorCodes::CANNOT_PARSE_TEXT, "No indices parsed from force_data_skipping_indices ('{}')", indices_str);
std::unordered_set<std::string> useful_indices_names;
for (const auto & useful_index : skip_indexes.useful_indices)

View File

@ -73,6 +73,7 @@ namespace MergeTreeSetting
extern const MergeTreeSettingsFloat min_free_disk_ratio_to_perform_insert;
extern const MergeTreeSettingsBool optimize_row_order;
extern const MergeTreeSettingsFloat ratio_of_defaults_for_sparse_serialization;
extern const MergeTreeSettingsBool prewarm_mark_cache;
}
namespace ErrorCodes
@ -684,6 +685,7 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeTempPartImpl(
/// This effectively chooses minimal compression method:
/// either default lz4 or compression method with zero thresholds on absolute and relative part size.
auto compression_codec = data.getContext()->chooseCompressionCodec(0, 0);
bool save_marks_in_cache = (*data_settings)[MergeTreeSetting::prewarm_mark_cache] && data.getContext()->getMarkCache();
auto out = std::make_unique<MergedBlockOutputStream>(
new_data_part,
@ -693,8 +695,9 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeTempPartImpl(
statistics,
compression_codec,
context->getCurrentTransaction() ? context->getCurrentTransaction()->tid : Tx::PrehistoricTID,
false,
false,
/*reset_columns=*/ false,
save_marks_in_cache,
/*blocks_are_granules_size=*/ false,
context->getWriteSettings());
out->writeWithPermutation(block, perm_ptr);
@ -829,6 +832,7 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeProjectionPartImpl(
/// This effectively chooses minimal compression method:
/// either default lz4 or compression method with zero thresholds on absolute and relative part size.
auto compression_codec = data.getContext()->chooseCompressionCodec(0, 0);
bool save_marks_in_cache = (*data.getSettings())[MergeTreeSetting::prewarm_mark_cache] && data.getContext()->getMarkCache();
auto out = std::make_unique<MergedBlockOutputStream>(
new_data_part,
@ -839,7 +843,10 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeProjectionPartImpl(
ColumnsStatistics{},
compression_codec,
Tx::PrehistoricTID,
false, false, data.getContext()->getWriteSettings());
/*reset_columns=*/ false,
save_marks_in_cache,
/*blocks_are_granules_size=*/ false,
data.getContext()->getWriteSettings());
out->writeWithPermutation(block, perm_ptr);
auto finalizer = out->finalizePartAsync(new_data_part, false);

View File

@ -34,6 +34,7 @@ MergeTreeWriterSettings::MergeTreeWriterSettings(
const MergeTreeSettingsPtr & storage_settings,
bool can_use_adaptive_granularity_,
bool rewrite_primary_key_,
bool save_marks_in_cache_,
bool blocks_are_granules_size_)
: min_compress_block_size(
(*storage_settings)[MergeTreeSetting::min_compress_block_size] ? (*storage_settings)[MergeTreeSetting::min_compress_block_size] : global_settings[Setting::min_compress_block_size])
@ -46,6 +47,7 @@ MergeTreeWriterSettings::MergeTreeWriterSettings(
, primary_key_compress_block_size((*storage_settings)[MergeTreeSetting::primary_key_compress_block_size])
, can_use_adaptive_granularity(can_use_adaptive_granularity_)
, rewrite_primary_key(rewrite_primary_key_)
, save_marks_in_cache(save_marks_in_cache_)
, blocks_are_granules_size(blocks_are_granules_size_)
, query_write_settings(query_write_settings_)
, low_cardinality_max_dictionary_size(global_settings[Setting::low_cardinality_max_dictionary_size])

View File

@ -61,7 +61,8 @@ struct MergeTreeWriterSettings
const MergeTreeSettingsPtr & storage_settings,
bool can_use_adaptive_granularity_,
bool rewrite_primary_key_,
bool blocks_are_granules_size_ = false);
bool save_marks_in_cache_,
bool blocks_are_granules_size_);
size_t min_compress_block_size;
size_t max_compress_block_size;
@ -75,6 +76,7 @@ struct MergeTreeWriterSettings
bool can_use_adaptive_granularity;
bool rewrite_primary_key;
bool save_marks_in_cache;
bool blocks_are_granules_size;
WriteSettings query_write_settings;

View File

@ -3,10 +3,12 @@
#include <Common/threadPoolCallbackRunner.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MergeTreeMarksLoader.h>
#include <Storages/MergeTree/MergeTreeSettings.h>
#include <Common/CurrentMetrics.h>
#include <Common/MemoryTrackerBlockerInThread.h>
#include <Common/ThreadPool.h>
#include <Common/setThreadName.h>
#include <Parsers/parseIdentifierOrStringLiteral.h>
#include <utility>
@ -21,6 +23,11 @@ namespace ProfileEvents
namespace DB
{
namespace MergeTreeSetting
{
extern const MergeTreeSettingsString columns_to_prewarm_mark_cache;
}
namespace ErrorCodes
{
extern const int CANNOT_READ_ALL_DATA;
@ -211,6 +218,7 @@ MarkCache::MappedPtr MergeTreeMarksLoader::loadMarksSync()
if (mark_cache)
{
auto key = MarkCache::hash(fs::path(data_part_storage->getFullPath()) / mrk_path);
if (save_marks_in_cache)
{
auto callback = [this] { return loadMarksImpl(); };
@ -249,4 +257,25 @@ std::future<MarkCache::MappedPtr> MergeTreeMarksLoader::loadMarksAsync()
"LoadMarksThread");
}
void addMarksToCache(const IMergeTreeDataPart & part, const PlainMarksByName & cached_marks, MarkCache * mark_cache)
{
MemoryTrackerBlockerInThread temporarily_disable_memory_tracker;
for (const auto & [stream_name, marks] : cached_marks)
{
auto mark_path = part.index_granularity_info.getMarksFilePath(stream_name);
auto key = MarkCache::hash(fs::path(part.getDataPartStorage().getFullPath()) / mark_path);
mark_cache->set(key, std::make_shared<MarksInCompressedFile>(*marks));
}
}
Names getColumnsToPrewarmMarks(const MergeTreeSettings & settings, const NamesAndTypesList & columns_list)
{
auto columns_str = settings[MergeTreeSetting::columns_to_prewarm_mark_cache].toString();
if (columns_str.empty())
return columns_list.getNames();
return parseIdentifiersOrStringLiterals(columns_str, Context::getGlobalContextInstance()->getSettingsRef());
}
}

View File

@ -77,4 +77,13 @@ private:
using MergeTreeMarksLoaderPtr = std::shared_ptr<MergeTreeMarksLoader>;
class IMergeTreeDataPart;
struct MergeTreeSettings;
/// Adds computed marks for part to the marks cache.
void addMarksToCache(const IMergeTreeDataPart & part, const PlainMarksByName & cached_marks, MarkCache * mark_cache);
/// Returns the list of columns suitable for prewarming of mark cache according to settings.
Names getColumnsToPrewarmMarks(const MergeTreeSettings & settings, const NamesAndTypesList & columns_list);
}

View File

@ -232,6 +232,8 @@ namespace ErrorCodes
DECLARE(UInt64, primary_key_compress_block_size, 65536, "Primary compress block size, the actual size of the block to compress.", 0) \
DECLARE(Bool, primary_key_lazy_load, true, "Load primary key in memory on first use instead of on table initialization. This can save memory in the presence of a large number of tables.", 0) \
DECLARE(Float, primary_key_ratio_of_unique_prefix_values_to_skip_suffix_columns, 0.9f, "If the value of a column of the primary key in data part changes at least in this ratio of times, skip loading next columns in memory. This allows to save memory usage by not loading useless columns of the primary key.", 0) \
DECLARE(Bool, prewarm_mark_cache, false, "If true mark cache will be prewarmed by saving marks to mark cache on inserts, merges, fetches and on startup of server", 0) \
DECLARE(String, columns_to_prewarm_mark_cache, "", "List of columns to prewarm mark cache for (if enabled). Empty means all columns", 0) \
/** Projection settings. */ \
DECLARE(UInt64, max_projections, 25, "The maximum number of merge tree projections.", 0) \
DECLARE(LightweightMutationProjectionMode, lightweight_mutation_projection_mode, LightweightMutationProjectionMode::THROW, "When lightweight delete happens on a table with projection(s), the possible operations include throw the exception as projection exists, or drop projections of this table's relevant parts, or rebuild the projections.", 0) \

View File

@ -243,6 +243,15 @@ void MergeTreeSink::finishDelayedChunk()
/// Part can be deduplicated, so increment counters and add to part log only if it's really added
if (added)
{
if (auto * mark_cache = storage.getContext()->getMarkCache().get())
{
for (const auto & stream : partition.temp_part.streams)
{
auto marks = stream.stream->releaseCachedMarks();
addMarksToCache(*part, marks, mark_cache);
}
}
auto counters_snapshot = std::make_shared<ProfileEvents::Counters::Snapshot>(partition.part_counters.getPartiallyAtomicSnapshot());
PartLog::addNewPart(storage.getContext(), PartLog::PartLogEntry(part, partition.elapsed_ns, counters_snapshot));
StorageMergeTree::incrementInsertedPartsProfileEvent(part->getType());

View File

@ -25,6 +25,7 @@ MergedBlockOutputStream::MergedBlockOutputStream(
CompressionCodecPtr default_codec_,
TransactionID tid,
bool reset_columns_,
bool save_marks_in_cache,
bool blocks_are_granules_size,
const WriteSettings & write_settings_,
const MergeTreeIndexGranularity & computed_index_granularity)
@ -39,6 +40,7 @@ MergedBlockOutputStream::MergedBlockOutputStream(
storage_settings,
data_part->index_granularity_info.mark_type.adaptive,
/* rewrite_primary_key = */ true,
save_marks_in_cache,
blocks_are_granules_size);
/// TODO: looks like isStoredOnDisk() is always true for MergeTreeDataPart

View File

@ -24,6 +24,7 @@ public:
CompressionCodecPtr default_codec_,
TransactionID tid,
bool reset_columns_ = false,
bool save_marks_in_cache = false,
bool blocks_are_granules_size = false,
const WriteSettings & write_settings = {},
const MergeTreeIndexGranularity & computed_index_granularity = {});

View File

@ -19,6 +19,7 @@ MergedColumnOnlyOutputStream::MergedColumnOnlyOutputStream(
const MergeTreeIndices & indices_to_recalc,
const ColumnsStatistics & stats_to_recalc_,
WrittenOffsetColumns * offset_columns_,
bool save_marks_in_cache,
const MergeTreeIndexGranularity & index_granularity,
const MergeTreeIndexGranularityInfo * index_granularity_info)
: IMergedBlockOutputStream(data_part->storage.getSettings(), data_part->getDataPartStoragePtr(), metadata_snapshot_, columns_list_, /*reset_columns=*/ true)
@ -30,7 +31,9 @@ MergedColumnOnlyOutputStream::MergedColumnOnlyOutputStream(
data_part->storage.getContext()->getWriteSettings(),
storage_settings,
index_granularity_info ? index_granularity_info->mark_type.adaptive : data_part->storage.canUseAdaptiveGranularity(),
/* rewrite_primary_key = */ false);
/* rewrite_primary_key = */ false,
save_marks_in_cache,
/* blocks_are_granules_size = */ false);
writer = createMergeTreeDataPartWriter(
data_part->getType(),

View File

@ -22,6 +22,7 @@ public:
const MergeTreeIndices & indices_to_recalc_,
const ColumnsStatistics & stats_to_recalc_,
WrittenOffsetColumns * offset_columns_ = nullptr,
bool save_marks_in_cache = false,
const MergeTreeIndexGranularity & index_granularity = {},
const MergeTreeIndexGranularityInfo * index_granularity_info_ = nullptr);

View File

@ -1623,6 +1623,7 @@ private:
ctx->compression_codec,
ctx->txn ? ctx->txn->tid : Tx::PrehistoricTID,
/*reset_columns=*/ true,
/*save_marks_in_cache=*/ false,
/*blocks_are_granules_size=*/ false,
ctx->context->getWriteSettings(),
computed_granularity);
@ -1851,6 +1852,7 @@ private:
std::vector<MergeTreeIndexPtr>(ctx->indices_to_recalc.begin(), ctx->indices_to_recalc.end()),
ColumnsStatistics(ctx->stats_to_recalc.begin(), ctx->stats_to_recalc.end()),
nullptr,
/*save_marks_in_cache=*/ false,
ctx->source_part->index_granularity,
&ctx->source_part->index_granularity_info
);

View File

@ -481,6 +481,17 @@ void ReplicatedMergeTreeSinkImpl<false>::finishDelayedChunk(const ZooKeeperWithF
/// Set a special error code if the block is duplicate
int error = (deduplicate && deduplicated) ? ErrorCodes::INSERT_WAS_DEDUPLICATED : 0;
auto * mark_cache = storage.getContext()->getMarkCache().get();
if (!error && mark_cache)
{
for (const auto & stream : partition.temp_part.streams)
{
auto marks = stream.stream->releaseCachedMarks();
addMarksToCache(*part, marks, mark_cache);
}
}
auto counters_snapshot = std::make_shared<ProfileEvents::Counters::Snapshot>(partition.part_counters.getPartiallyAtomicSnapshot());
PartLog::addNewPart(storage.getContext(), PartLog::PartLogEntry(part, partition.elapsed_ns, counters_snapshot), ExecutionStatus(error));
StorageReplicatedMergeTree::incrementInsertedPartsProfileEvent(part->getType());
@ -521,8 +532,18 @@ void ReplicatedMergeTreeSinkImpl<true>::finishDelayedChunk(const ZooKeeperWithFa
{
partition.temp_part.finalize();
auto conflict_block_ids = commitPart(zookeeper, partition.temp_part.part, partition.block_id, delayed_chunk->replicas_num).first;
if (conflict_block_ids.empty())
{
if (auto * mark_cache = storage.getContext()->getMarkCache().get())
{
for (const auto & stream : partition.temp_part.streams)
{
auto marks = stream.stream->releaseCachedMarks();
addMarksToCache(*partition.temp_part.part, marks, mark_cache);
}
}
auto counters_snapshot = std::make_shared<ProfileEvents::Counters::Snapshot>(partition.part_counters.getPartiallyAtomicSnapshot());
PartLog::addNewPart(
storage.getContext(),

View File

@ -38,6 +38,7 @@
#include <Common/MemoryTracker.h>
#include <Common/ProfileEventsScope.h>
#include <Common/escapeForFileName.h>
#include <IO/SharedThreadPools.h>
namespace DB
@ -154,6 +155,7 @@ StorageMergeTree::StorageMergeTree(
loadMutations();
loadDeduplicationLog();
prewarmMarkCache(getActivePartsLoadingThreadPool().get());
}

View File

@ -103,6 +103,7 @@
#include <Backups/RestorerFromBackup.h>
#include <Common/scope_guard_safe.h>
#include <IO/SharedThreadPools.h>
#include <boost/algorithm/string/join.hpp>
#include <boost/algorithm/string/replace.hpp>
@ -207,6 +208,7 @@ namespace MergeTreeSetting
extern const MergeTreeSettingsBool use_minimalistic_checksums_in_zookeeper;
extern const MergeTreeSettingsBool use_minimalistic_part_header_in_zookeeper;
extern const MergeTreeSettingsMilliseconds wait_for_unique_parts_send_before_shutdown_ms;
extern const MergeTreeSettingsBool prewarm_mark_cache;
}
namespace FailPoints
@ -507,6 +509,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
}
loadDataParts(skip_sanity_checks, expected_parts_on_this_replica);
prewarmMarkCache(getActivePartsLoadingThreadPool().get());
if (LoadingStrictnessLevel::ATTACH <= mode)
{
@ -5079,6 +5082,12 @@ bool StorageReplicatedMergeTree::fetchPart(
ProfileEvents::increment(ProfileEvents::ObsoleteReplicatedParts);
}
if ((*getSettings())[MergeTreeSetting::prewarm_mark_cache] && getContext()->getMarkCache())
{
auto column_names = getColumnsToPrewarmMarks(*getSettings(), part->getColumns());
part->loadMarksToCache(column_names, getContext()->getMarkCache().get());
}
write_part_log({});
}
else

View File

@ -1,5 +1,7 @@
#!/usr/bin/env bash
set -e -o pipefail
ROOT_PATH="$(git rev-parse --show-toplevel)"
IFS=$'\t'

View File

@ -126,8 +126,6 @@ void attachSystemTablesServer(ContextPtr context, IDatabase & system_database, b
attachNoDescription<StorageSystemOne>(context, system_database, "one", "This table contains a single row with a single dummy UInt8 column containing the value 0. Used when the table is not specified explicitly, for example in queries like `SELECT 1`.");
attachNoDescription<StorageSystemNumbers>(context, system_database, "numbers", "Generates all natural numbers, starting from 0 (to 2^64 - 1, and then again) in sorted order.", false, "number");
attachNoDescription<StorageSystemNumbers>(context, system_database, "numbers_mt", "Multithreaded version of `system.numbers`. Numbers order is not guaranteed.", true, "number");
attachNoDescription<StorageSystemNumbers>(context, system_database, "generate_series", "Generates arithmetic progression of natural numbers in sorted order in a given segment with a given step", false, "generate_series");
attachNoDescription<StorageSystemNumbers>(context, system_database, "generateSeries", "Generates arithmetic progression of natural numbers in sorted order in a given segment with a given step", false, "generate_series");
attachNoDescription<StorageSystemZeros>(context, system_database, "zeros", "Produces unlimited number of non-materialized zeros.", false);
attachNoDescription<StorageSystemZeros>(context, system_database, "zeros_mt", "Multithreaded version of system.zeros.", true);
attach<StorageSystemDatabases>(context, system_database, "databases", "Lists all databases of the current server.");

View File

@ -1,4 +1,5 @@
<clickhouse>
<asynchronous_metrics_update_period_s>1</asynchronous_metrics_update_period_s>
<asynchronous_metrics_enable_heavy_metrics>1</asynchronous_metrics_enable_heavy_metrics>
<asynchronous_heavy_metrics_update_period_s>1</asynchronous_heavy_metrics_update_period_s>
</clickhouse>

View File

@ -108,6 +108,7 @@ TABLE ENGINE ['TABLE ENGINE'] TABLE_ENGINE ALL
SYSTEM SHUTDOWN ['SYSTEM KILL','SHUTDOWN'] GLOBAL SYSTEM
SYSTEM DROP DNS CACHE ['SYSTEM DROP DNS','DROP DNS CACHE','DROP DNS'] GLOBAL SYSTEM DROP CACHE
SYSTEM DROP CONNECTIONS CACHE ['SYSTEM DROP CONNECTIONS CACHE','DROP CONNECTIONS CACHE'] GLOBAL SYSTEM DROP CACHE
SYSTEM PREWARM MARK CACHE ['SYSTEM PREWARM MARK','PREWARM MARK CACHE','PREWARM MARKS'] GLOBAL SYSTEM DROP CACHE
SYSTEM DROP MARK CACHE ['SYSTEM DROP MARK','DROP MARK CACHE','DROP MARKS'] GLOBAL SYSTEM DROP CACHE
SYSTEM DROP UNCOMPRESSED CACHE ['SYSTEM DROP UNCOMPRESSED','DROP UNCOMPRESSED CACHE','DROP UNCOMPRESSED'] GLOBAL SYSTEM DROP CACHE
SYSTEM DROP MMAP CACHE ['SYSTEM DROP MMAP','DROP MMAP CACHE','DROP MMAP'] GLOBAL SYSTEM DROP CACHE

View File

@ -16,7 +16,8 @@ select distinct
from (
select string_value
from test_table
);
)
order by all;
select distinct
'constant_1' as constant_value, *

View File

@ -2,7 +2,6 @@
CREATE MATERIALIZED VIEW default.a\nREFRESH EVERY 2 SECOND\n(\n `x` UInt64\n)\nENGINE = Memory\nAS SELECT number AS x\nFROM numbers(2)\nUNION ALL\nSELECT rand64() AS x
<2: refreshed> 3 1 1
<3: time difference at least> 1000
<4: next refresh in> 2 Scheduled
<4.1: fake clock> Scheduled 2050-01-01 00:00:01 2050-01-01 00:00:02 1 3 3 3 0
<4.5: altered> Scheduled 2050-01-01 00:00:01 2052-01-01 00:00:00
CREATE MATERIALIZED VIEW default.a\nREFRESH EVERY 2 YEAR\n(\n `x` UInt64\n)\nENGINE = Memory\nAS SELECT x * 2 AS x\nFROM default.src

View File

@ -50,14 +50,6 @@ done
# to make sure the clock+timer code works at all. If it turns out flaky, increase refresh period above.
$CLICKHOUSE_CLIENT -q "
select '<3: time difference at least>', min2(reinterpret(now64(), 'Int64') - $start_time, 1000);"
while :
do
# Wait for status to change to Scheduled. If status = Scheduling, next_refresh_time is stale.
res="`$CLICKHOUSE_CLIENT -q "select '<4: next refresh in>', next_refresh_time-last_success_time, status from refreshes -- $LINENO"`"
echo "$res" | grep -q 'Scheduled' && break
sleep 0.5
done
echo "$res"
# Create a source table from which views will read.
$CLICKHOUSE_CLIENT -q "

View File

@ -0,0 +1,6 @@
1
1
1
4
4
4

View File

@ -0,0 +1,30 @@
-- Tags: no-parallel, no-random-settings, no-random-merge-tree-settings
DROP TABLE IF EXISTS t_prewarm_columns;
CREATE TABLE t_prewarm_columns (a UInt64, b UInt64, c UInt64, d UInt64)
ENGINE = MergeTree ORDER BY a
SETTINGS min_bytes_for_wide_part = 0, prewarm_mark_cache = 1, columns_to_prewarm_mark_cache = 'a,c';
INSERT INTO t_prewarm_columns VALUES (1, 1, 1, 1);
SELECT count() FROM t_prewarm_columns WHERE NOT ignore(*);
SYSTEM DROP MARK CACHE;
DETACH TABLE t_prewarm_columns;
ATTACH TABLE t_prewarm_columns;
SELECT count() FROM t_prewarm_columns WHERE NOT ignore(*);
SYSTEM DROP MARK CACHE;
SYSTEM PREWARM MARK CACHE t_prewarm_columns;
SELECT count() FROM t_prewarm_columns WHERE NOT ignore(*);
SYSTEM FLUSH LOGS;
SELECT ProfileEvents['LoadedMarksCount'] FROM system.query_log
WHERE current_database = currentDatabase() AND type = 'QueryFinish' AND query LIKE 'SELECT count() FROM t_prewarm_columns%'
ORDER BY event_time_microseconds;
DROP TABLE t_prewarm_columns;

View File

@ -0,0 +1,16 @@
20000
20000
40000
40000
40000
40000
40000
40000
0
0
0
0
0
0
1
0

View File

@ -0,0 +1,65 @@
-- Tags: no-parallel, no-shared-merge-tree
DROP TABLE IF EXISTS t_prewarm_cache_rmt_1;
DROP TABLE IF EXISTS t_prewarm_cache_rmt_2;
CREATE TABLE t_prewarm_cache_rmt_1 (a UInt64, b UInt64, c UInt64)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{database}/03254_prewarm_mark_cache_smt/t_prewarm_cache', '1')
ORDER BY a SETTINGS prewarm_mark_cache = 1;
CREATE TABLE t_prewarm_cache_rmt_2 (a UInt64, b UInt64, c UInt64)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{database}/03254_prewarm_mark_cache_smt/t_prewarm_cache', '2')
ORDER BY a SETTINGS prewarm_mark_cache = 1;
SYSTEM DROP MARK CACHE;
SYSTEM STOP FETCHES t_prewarm_cache_rmt_2;
-- Check that prewarm works on insert.
INSERT INTO t_prewarm_cache_rmt_1 SELECT number, rand(), rand() FROM numbers(20000);
SELECT count() FROM t_prewarm_cache_rmt_1 WHERE NOT ignore(*);
-- Check that prewarm works on fetch.
SYSTEM DROP MARK CACHE;
SYSTEM START FETCHES t_prewarm_cache_rmt_2;
SYSTEM SYNC REPLICA t_prewarm_cache_rmt_2;
SELECT count() FROM t_prewarm_cache_rmt_2 WHERE NOT ignore(*);
-- Check that prewarm works on merge.
INSERT INTO t_prewarm_cache_rmt_1 SELECT number, rand(), rand() FROM numbers(20000);
OPTIMIZE TABLE t_prewarm_cache_rmt_1 FINAL;
SYSTEM SYNC REPLICA t_prewarm_cache_rmt_2;
SELECT count() FROM t_prewarm_cache_rmt_1 WHERE NOT ignore(*);
SELECT count() FROM t_prewarm_cache_rmt_2 WHERE NOT ignore(*);
-- Check that prewarm works on restart.
SYSTEM DROP MARK CACHE;
DETACH TABLE t_prewarm_cache_rmt_1;
DETACH TABLE t_prewarm_cache_rmt_2;
ATTACH TABLE t_prewarm_cache_rmt_1;
ATTACH TABLE t_prewarm_cache_rmt_2;
SELECT count() FROM t_prewarm_cache_rmt_1 WHERE NOT ignore(*);
SELECT count() FROM t_prewarm_cache_rmt_2 WHERE NOT ignore(*);
SYSTEM DROP MARK CACHE;
SELECT count() FROM t_prewarm_cache_rmt_1 WHERE NOT ignore(*);
--- Check that system query works.
SYSTEM PREWARM MARK CACHE t_prewarm_cache_rmt_1;
SELECT count() FROM t_prewarm_cache_rmt_1 WHERE NOT ignore(*);
SYSTEM FLUSH LOGS;
SELECT ProfileEvents['LoadedMarksCount'] > 0 FROM system.query_log
WHERE current_database = currentDatabase() AND type = 'QueryFinish' AND query LIKE 'SELECT count() FROM t_prewarm_cache%'
ORDER BY event_time_microseconds;
DROP TABLE IF EXISTS t_prewarm_cache_rmt_1;
DROP TABLE IF EXISTS t_prewarm_cache_rmt_2;

View File

@ -0,0 +1,4 @@
Row 1:
──────
id: 1
visits: 115

View File

@ -0,0 +1,34 @@
CREATE TABLE test_table
(
dt DateTime,
id UInt32,
url String,
visits UInt32
)
ENGINE ReplacingMergeTree
ORDER BY (dt, id)
PARTITION BY toYYYYMM(dt);
SYSTEM STOP merges test_table;
INSERT INTO test_table VALUES (toDate('2024-10-24'), 1, '/index', 100);
INSERT INTO test_table VALUES (toDate('2024-10-24'), 1, '/index', 101);
INSERT INTO test_table VALUES (toDate('2024-10-24'), 1, '/index', 102);
INSERT INTO test_table VALUES (toDate('2024-10-24'), 1, '/index', 103);
INSERT INTO test_table VALUES (toDate('2024-10-24'), 1, '/index', 104);
INSERT INTO test_table VALUES (toDate('2024-10-24'), 1, '/index', 105);
INSERT INTO test_table VALUES (toDate('2024-10-24'), 1, '/index', 106);
INSERT INTO test_table VALUES (toDate('2024-10-24'), 1, '/index', 107);
INSERT INTO test_table VALUES (toDate('2024-10-24'), 1, '/index', 108);
INSERT INTO test_table VALUES (toDate('2024-10-24'), 1, '/index', 109);
INSERT INTO test_table VALUES (toDate('2024-10-24'), 1, '/index', 110);
INSERT INTO test_table VALUES (toDate('2024-10-24'), 1, '/index', 111);
INSERT INTO test_table VALUES (toDate('2024-10-24'), 1, '/index', 112);
INSERT INTO test_table VALUES (toDate('2024-10-24'), 1, '/index', 113);
INSERT INTO test_table VALUES (toDate('2024-10-24'), 1, '/index', 114);
INSERT INTO test_table VALUES (toDate('2024-10-24'), 1, '/index', 115);
ALTER TABLE test_table DETACH PARTITION 202410;
ALTER TABLE test_table ATTACH PARTITION 202410;
SELECT id, visits FROM test_table FINAL ORDER BY id FORMAT Vertical;

View File

@ -1,6 +1,7 @@
#!/usr/bin/env bash
if [[ "$OSTYPE" == "darwin"* ]]; then
if [[ "$OSTYPE" == "darwin"* ]]
then
# use GNU versions, their presence is ensured in cmake/tools.cmake
GREP_CMD=ggrep
FIND_CMD=gfind
@ -12,31 +13,35 @@ fi
ROOT_PATH="$(git rev-parse --show-toplevel)"
LIBS_PATH="${ROOT_PATH}/contrib"
mapfile -t libs < <(echo "${ROOT_PATH}/base/poco"; find "${LIBS_PATH}" -type d -maxdepth 1 ! -name '*-cmake' | LC_ALL=C sort)
for LIB in "${libs[@]}"; do
mapfile -t libs < <(echo "${ROOT_PATH}/base/poco"; find "${LIBS_PATH}" -maxdepth 1 -type d -not -name '*-cmake' -not -name 'rust_vendor' | LC_ALL=C sort)
for LIB in "${libs[@]}"
do
LIB_NAME=$(basename "$LIB")
LIB_LICENSE=$(
LC_ALL=C ${FIND_CMD} "$LIB" -type f -and '(' -iname 'LICENSE*' -or -iname 'COPYING*' -or -iname 'COPYRIGHT*' ')' -and -not '(' -iname '*.html' -or -iname '*.htm' -or -iname '*.rtf' -or -name '*.cpp' -or -name '*.h' -or -iname '*.json' ')' -printf "%d\t%p\n" |
LC_ALL=C ${FIND_CMD} "$LIB" -type f -and '(' -iname 'LICENSE*' -or -iname 'COPYING*' -or -iname 'COPYRIGHT*' -or -iname 'NOTICE' ')' -and -not '(' -iname '*.html' -or -iname '*.htm' -or -iname '*.rtf' -or -name '*.cpp' -or -name '*.h' -or -iname '*.json' ')' -printf "%d\t%p\n" |
LC_ALL=C sort | LC_ALL=C awk '
BEGIN { IGNORECASE=1; min_depth = 0 }
/LICENSE/ { if (!min_depth || $1 <= min_depth) { min_depth = $1; license = $2 } }
/COPY/ { if (!min_depth || $1 <= min_depth) { min_depth = $1; copying = $2 } }
END { if (license) { print license } else { print copying } }')
if [ -n "$LIB_LICENSE" ]; then
/NOTICE/ { if (!min_depth || $1 <= min_depth) { min_depth = $1; notice = $2 } }
END { if (license) { print license } else if (copying) { print copying } else { print notice } }')
if [ -n "$LIB_LICENSE" ]
then
LICENSE_TYPE=$(
(${GREP_CMD} -q -F 'Apache' "$LIB_LICENSE" &&
echo "Apache") ||
(${GREP_CMD} -q -F 'Boost' "$LIB_LICENSE" &&
echo "Boost") ||
(${GREP_CMD} -q -i -P 'public\s*domain' "$LIB_LICENSE" &&
(${GREP_CMD} -q -i -P 'public\s*domain|CC0 1\.0 Universal' "$LIB_LICENSE" &&
echo "Public Domain") ||
(${GREP_CMD} -q -F 'BSD' "$LIB_LICENSE" &&
echo "BSD") ||
(${GREP_CMD} -q -F 'Lesser General Public License' "$LIB_LICENSE" &&
echo "LGPL") ||
(${GREP_CMD} -q -F 'General Public License' "$LIB_LICENSE" &&
echo "GPL") ||
(${GREP_CMD} -q -i -F 'The origin of this software must not be misrepresented' "$LIB_LICENSE" &&
${GREP_CMD} -q -i -F 'Altered source versions must be plainly marked as such' "$LIB_LICENSE" &&
${GREP_CMD} -q -i -F 'This notice may not be removed or altered' "$LIB_LICENSE" &&
@ -73,8 +78,23 @@ for LIB in "${libs[@]}"; do
echo "HPND") ||
echo "Unknown")
if [ "$LICENSE_TYPE" == "GPL" ]
then
echo "Fatal error: General Public License found in ${LIB_NAME}." >&2
exit 1
fi
if [ "$LICENSE_TYPE" == "Unknown" ]
then
echo "Fatal error: sources with unknown license found in ${LIB_NAME}." >&2
exit 1
fi
RELATIVE_PATH=$(echo "$LIB_LICENSE" | sed -r -e 's!^.+/(contrib|base)/!/\1/!')
echo -e "$LIB_NAME\t$LICENSE_TYPE\t$RELATIVE_PATH"
fi
done
# Special care for Rust
find "${LIBS_PATH}/rust_vendor/" -name 'Cargo.toml' | xargs grep 'license = ' | (grep -v -P 'MIT|Apache|MPL' && echo "Fatal error: unrecognized licenses in the Rust code" >&2 && exit 1 || true)