Merge remote-tracking branch 'blessed/master' into dragonbox

This commit is contained in:
Raúl Marín 2024-12-03 00:37:01 +01:00
commit 7afee244c5
64 changed files with 525 additions and 140 deletions

View File

@ -66,6 +66,7 @@ if (ENABLE_CHECK_HEAVY_BUILDS)
# Twice as large
set (RLIMIT_DATA 10000000000)
set (RLIMIT_AS 20000000000)
set (RLIMIT_CPU 2000)
endif()
# For some files currently building RISCV64/LOONGARCH64 might be too slow.

View File

@ -256,10 +256,6 @@ AllocationTrace MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceed
Int64 will_be = size ? size + amount.fetch_add(size, std::memory_order_relaxed) : amount.load(std::memory_order_relaxed);
Int64 will_be_rss = size ? size + rss.fetch_add(size, std::memory_order_relaxed) : rss.load(std::memory_order_relaxed);
auto metric_loaded = metric.load(std::memory_order_relaxed);
if (metric_loaded != CurrentMetrics::end() && size)
CurrentMetrics::add(metric_loaded, size);
Int64 current_hard_limit = hard_limit.load(std::memory_order_relaxed);
Int64 current_profiler_limit = profiler_limit.load(std::memory_order_relaxed);
@ -371,6 +367,10 @@ AllocationTrace MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceed
}
}
auto metric_loaded = metric.load(std::memory_order_relaxed);
if (metric_loaded != CurrentMetrics::end() && size)
CurrentMetrics::add(metric_loaded, size);
if (peak_updated && allocation_traced)
{
MemoryTrackerBlockerInThread untrack_lock(VariableContext::Global);

View File

@ -229,6 +229,7 @@
\
M(WaitMarksLoadMicroseconds, "Time spent loading marks", ValueType::Microseconds) \
M(BackgroundLoadingMarksTasks, "Number of background tasks for loading marks", ValueType::Number) \
M(LoadedMarksFiles, "Number of mark files loaded.", ValueType::Number) \
M(LoadedMarksCount, "Number of marks loaded (total across columns).", ValueType::Number) \
M(LoadedMarksMemoryBytes, "Size of in-memory representations of loaded marks.", ValueType::Bytes) \
M(LoadedPrimaryIndexFiles, "Number of primary index files loaded.", ValueType::Number) \
@ -814,6 +815,7 @@ The server successfully detected this situation and will download merged part fr
M(LogWarning, "Number of log messages with level Warning", ValueType::Number) \
M(LogError, "Number of log messages with level Error", ValueType::Number) \
M(LogFatal, "Number of log messages with level Fatal", ValueType::Number) \
M(LoggerElapsedNanoseconds, "Cumulative time spend in logging", ValueType::Nanoseconds) \
\
M(InterfaceHTTPSendBytes, "Number of bytes sent through HTTP interfaces", ValueType::Bytes) \
M(InterfaceHTTPReceiveBytes, "Number of bytes received through HTTP interfaces", ValueType::Bytes) \
@ -1087,6 +1089,11 @@ void incrementForLogMessage(Poco::Message::Priority priority)
}
}
void incrementLoggerElapsedNanoseconds(UInt64 ns)
{
increment(LoggerElapsedNanoseconds, ns);
}
CountersIncrement::CountersIncrement(Counters::Snapshot const & snapshot)
{
init();

View File

@ -173,6 +173,9 @@ namespace ProfileEvents
/// Increment a counter for log messages.
void incrementForLogMessage(Poco::Message::Priority priority);
/// Increment time consumed by logging.
void incrementLoggerElapsedNanoseconds(UInt64 ns);
/// Get name of event by identifier. Returns statically allocated string.
const char * getName(Event event);

View File

@ -11,6 +11,7 @@
#include <Common/Logger.h>
#include <Common/LoggingFormatStringHelpers.h>
#include <Common/ProfileEvents.h>
#include <Common/Stopwatch.h>
#define LogToStr(x, y) std::make_unique<LogToStrImpl>(x, y)
@ -69,6 +70,7 @@ namespace impl
if (!_is_clients_log && !_logger->is((PRIORITY))) \
break; \
\
Stopwatch _logger_watch; \
try \
{ \
ProfileEvents::incrementForLogMessage(PRIORITY); \
@ -122,6 +124,7 @@ namespace impl
{ \
::write(STDERR_FILENO, static_cast<const void *>(MESSAGE_FOR_EXCEPTION_ON_LOGGING), sizeof(MESSAGE_FOR_EXCEPTION_ON_LOGGING)); \
} \
ProfileEvents::incrementLoggerElapsedNanoseconds(_logger_watch.elapsedNanoseconds()); \
} while (false)

View File

@ -35,61 +35,71 @@ std::string makeRegexpPatternFromGlobs(const std::string & initial_str_with_glob
}
std::string escaped_with_globs = buf_for_escaping.str();
static const re2::RE2 enum_or_range(R"({([\d]+\.\.[\d]+|[^{}*,]+,[^{}*]*[^{}*,])})"); /// regexp for {expr1,expr2,expr3} or {M..N}, where M and N - non-negative integers, expr's should be without "{", "}", "*" and ","
std::string_view input(escaped_with_globs);
static const re2::RE2 range_regex(R"({([\d]+\.\.[\d]+)})"); /// regexp for {M..N}, where M and N - non-negative integers
static const re2::RE2 enum_regex(R"({([^{}*,]+[^{}*]*[^{}*,])})"); /// regexp for {expr1,expr2,expr3}, expr's should be without "{", "}", "*" and ","
std::string_view matched;
std::string_view input(escaped_with_globs);
std::ostringstream oss_for_replacing; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
oss_for_replacing.exceptions(std::ios::failbit);
size_t current_index = 0;
while (RE2::FindAndConsume(&input, enum_or_range, &matched))
while (RE2::FindAndConsume(&input, range_regex, &matched))
{
std::string buffer(matched);
oss_for_replacing << escaped_with_globs.substr(current_index, matched.data() - escaped_with_globs.data() - current_index - 1) << '(';
if (!buffer.contains(','))
size_t range_begin = 0;
size_t range_end = 0;
char point;
ReadBufferFromString buf_range(buffer);
buf_range >> range_begin >> point >> point >> range_end;
size_t range_begin_width = buffer.find('.');
size_t range_end_width = buffer.size() - buffer.find_last_of('.') - 1;
bool leading_zeros = buffer[0] == '0';
size_t output_width = 0;
if (range_begin > range_end) //Descending Sequence {20..15} {9..01}
{
size_t range_begin = 0;
size_t range_end = 0;
char point;
ReadBufferFromString buf_range(buffer);
buf_range >> range_begin >> point >> point >> range_end;
std::swap(range_begin,range_end);
leading_zeros = buffer[buffer.find_last_of('.')+1]=='0';
std::swap(range_begin_width,range_end_width);
}
if (range_begin_width == 1 && leading_zeros)
output_width = 1; ///Special Case: {0..10} {0..999}
else
output_width = std::max(range_begin_width, range_end_width);
size_t range_begin_width = buffer.find('.');
size_t range_end_width = buffer.size() - buffer.find_last_of('.') - 1;
bool leading_zeros = buffer[0] == '0';
size_t output_width = 0;
if (range_begin > range_end) //Descending Sequence {20..15} {9..01}
{
std::swap(range_begin,range_end);
leading_zeros = buffer[buffer.find_last_of('.')+1]=='0';
std::swap(range_begin_width,range_end_width);
}
if (range_begin_width == 1 && leading_zeros)
output_width = 1; ///Special Case: {0..10} {0..999}
else
output_width = std::max(range_begin_width, range_end_width);
if (leading_zeros)
oss_for_replacing << std::setfill('0') << std::setw(static_cast<int>(output_width));
oss_for_replacing << range_begin;
for (size_t i = range_begin + 1; i <= range_end; ++i)
{
oss_for_replacing << '|';
if (leading_zeros)
oss_for_replacing << std::setfill('0') << std::setw(static_cast<int>(output_width));
oss_for_replacing << range_begin;
oss_for_replacing << i;
}
for (size_t i = range_begin + 1; i <= range_end; ++i)
{
oss_for_replacing << '|';
if (leading_zeros)
oss_for_replacing << std::setfill('0') << std::setw(static_cast<int>(output_width));
oss_for_replacing << i;
}
}
else
{
std::replace(buffer.begin(), buffer.end(), ',', '|');
oss_for_replacing << buffer;
}
oss_for_replacing << ")";
current_index = input.data() - escaped_with_globs.data();
}
while (RE2::FindAndConsume(&input, enum_regex, &matched))
{
std::string buffer(matched);
oss_for_replacing << escaped_with_globs.substr(current_index, matched.data() - escaped_with_globs.data() - current_index - 1) << '(';
std::replace(buffer.begin(), buffer.end(), ',', '|');
oss_for_replacing << buffer;
oss_for_replacing << ")";
current_index = input.data() - escaped_with_globs.data();
}
oss_for_replacing << escaped_with_globs.substr(current_index);
std::string almost_res = oss_for_replacing.str();
WriteBufferFromOwnString buf_final_processing;

View File

@ -12,6 +12,9 @@ TEST(Common, makeRegexpPatternFromGlobs)
EXPECT_EQ(makeRegexpPatternFromGlobs("*"), "[^/]*");
EXPECT_EQ(makeRegexpPatternFromGlobs("/?"), "/[^/]");
EXPECT_EQ(makeRegexpPatternFromGlobs("/*"), "/[^/]*");
EXPECT_EQ(makeRegexpPatternFromGlobs("{123}"), "(123)");
EXPECT_EQ(makeRegexpPatternFromGlobs("{test}"), "(test)");
EXPECT_EQ(makeRegexpPatternFromGlobs("{test.tar.gz}"), "(test\\.tar\\.gz)");
EXPECT_EQ(makeRegexpPatternFromGlobs("*_{{a,b,c,d}}/?.csv"), "[^/]*_\\{(a|b|c|d)\\}/[^/]\\.csv");
/* Regex Parsing for {..} can have three possible cases
1) The left range width == the right range width

View File

@ -142,13 +142,14 @@ void KeeperStateMachine<Storage>::init()
}
catch (...)
{
tryLogCurrentException(
LOG_FATAL(
log,
fmt::format(
"Aborting because of failure to load from latest snapshot with index {}. Problematic snapshot can be removed but it will "
"lead to data loss",
latest_log_index));
std::abort();
"Failure to load from latest snapshot with index {}: {}",
latest_log_index,
getCurrentExceptionMessage(true, true, false));
LOG_FATAL(
log, "Manual intervention is necessary for recovery. Problematic snapshot can be removed but it will lead to data loss");
abort();
}
}
@ -427,8 +428,13 @@ bool KeeperStateMachine<Storage>::preprocess(const KeeperStorageBase::RequestFor
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__, fmt::format("Failed to preprocess stored log at index {}, aborting to avoid inconsistent state", request_for_session.log_idx));
std::abort();
LOG_FATAL(
log,
"Failed to preprocess stored log at index {}: {}",
request_for_session.log_idx,
getCurrentExceptionMessage(true, true, false));
LOG_FATAL(log, "Aborting to avoid inconsistent state");
abort();
}
if (keeper_context->digestEnabled() && request_for_session.digest)

View File

@ -8,6 +8,11 @@
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
template<typename V>
struct ListNode
{
@ -292,7 +297,8 @@ public:
{
size_t hash_value = map.hash(key);
auto it = map.find(key, hash_value);
chassert(it != map.end());
if (it == map.end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Could not find key: '{}'", key);
auto list_itr = it->getMapped();
uint64_t old_value_size = list_itr->value.sizeInBytes();
@ -348,7 +354,8 @@ public:
const V & getValue(StringRef key) const
{
auto it = map.find(key);
chassert(it);
if (it == map.end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Could not find key: '{}'", key);
return it->getMapped()->value;
}
@ -356,7 +363,8 @@ public:
{
for (auto & itr : snapshot_invalid_iters)
{
chassert(!itr->isActiveInMap());
if (itr->isActiveInMap())
throw Exception(ErrorCodes::LOGICAL_ERROR, "{} is not active in map", itr->key);
updateDataSize(ERASE, itr->key.size, 0, itr->value.sizeInBytes(), /*remove_old=*/true);
if (itr->getFreeKey())
arena.free(const_cast<char *>(itr->key.data), itr->key.size);

View File

@ -1242,6 +1242,9 @@ Set the quoting rule for identifiers in SHOW CREATE query
)", 0) \
DECLARE(IdentifierQuotingStyle, show_create_query_identifier_quoting_style, IdentifierQuotingStyle::Backticks, R"(
Set the quoting style for identifiers in SHOW CREATE query
)", 0) \
DECLARE(String, composed_data_type_output_format_mode, "default", R"(
Set composed data type output format mode, default or spark.
)", 0) \
// End of FORMAT_FACTORY_SETTINGS

View File

@ -67,6 +67,7 @@ static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory
{"max_bytes_ratio_before_external_group_by", 0., 0., "New setting."},
{"max_bytes_ratio_before_external_sort", 0., 0., "New setting."},
{"use_async_executor_for_materialized_views", false, false, "New setting."},
{"composed_data_type_output_format_mode", "default", "default", "New setting"},
{"http_response_headers", "", "", "New setting."},
}
},

View File

@ -401,7 +401,7 @@ void SerializationArray::deserializeBinaryBulkWithMultipleStreams(
template <typename Writer>
static void serializeTextImpl(const IColumn & column, size_t row_num, WriteBuffer & ostr, Writer && write_nested)
static void serializeTextImpl(const IColumn & column, size_t row_num, const FormatSettings & settings, WriteBuffer & ostr, Writer && write_nested)
{
const ColumnArray & column_array = assert_cast<const ColumnArray &>(column);
const ColumnArray::Offsets & offsets = column_array.getOffsets();
@ -412,10 +412,14 @@ static void serializeTextImpl(const IColumn & column, size_t row_num, WriteBuffe
const IColumn & nested_column = column_array.getData();
writeChar('[', ostr);
for (size_t i = offset; i < next_offset; ++i)
if (next_offset != offset)
write_nested(nested_column, offset);
for (size_t i = offset + 1; i < next_offset; ++i)
{
if (i != offset)
writeChar(',', ostr);
writeChar(',', ostr);
if (settings.composed_data_type_output_format_mode == "spark")
writeChar(' ', ostr);
write_nested(nested_column, i);
}
writeChar(']', ostr);
@ -520,10 +524,13 @@ static ReturnType deserializeTextImpl(IColumn & column, ReadBuffer & istr, Reade
void SerializationArray::serializeText(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const
{
serializeTextImpl(column, row_num, ostr,
serializeTextImpl(column, row_num, settings, ostr,
[&](const IColumn & nested_column, size_t i)
{
nested->serializeTextQuoted(nested_column, i, ostr, settings);
if (settings.composed_data_type_output_format_mode == "spark")
nested->serializeText(nested_column, i, ostr, settings);
else
nested->serializeTextQuoted(nested_column, i, ostr, settings);
});
}

View File

@ -90,6 +90,7 @@ template <typename KeyWriter, typename ValueWriter>
void SerializationMap::serializeTextImpl(
const IColumn & column,
size_t row_num,
const FormatSettings & settings,
WriteBuffer & ostr,
KeyWriter && key_writer,
ValueWriter && value_writer) const
@ -104,15 +105,31 @@ void SerializationMap::serializeTextImpl(
size_t next_offset = offsets[row_num];
writeChar('{', ostr);
for (size_t i = offset; i < next_offset; ++i)
if (offset != next_offset)
{
if (i != offset)
writeChar(',', ostr);
key_writer(ostr, key, nested_tuple.getColumn(0), i);
writeChar(':', ostr);
value_writer(ostr, value, nested_tuple.getColumn(1), i);
key_writer(ostr, key, nested_tuple.getColumn(0), offset);
if (settings.composed_data_type_output_format_mode == "spark")
writeString(std::string_view(" -> "), ostr);
else
writeChar(':', ostr);
value_writer(ostr, value, nested_tuple.getColumn(1), offset);
}
if (settings.composed_data_type_output_format_mode == "spark")
for (size_t i = offset + 1; i < next_offset; ++i)
{
writeString(std::string_view(", "), ostr);
key_writer(ostr, key, nested_tuple.getColumn(0), i);
writeString(std::string_view(" -> "), ostr);
value_writer(ostr, value, nested_tuple.getColumn(1), i);
}
else
for (size_t i = offset + 1; i < next_offset; ++i)
{
writeChar(',', ostr);
key_writer(ostr, key, nested_tuple.getColumn(0), i);
writeChar(':', ostr);
value_writer(ostr, value, nested_tuple.getColumn(1), i);
}
writeChar('}', ostr);
}
@ -221,10 +238,13 @@ void SerializationMap::serializeText(const IColumn & column, size_t row_num, Wri
{
auto writer = [&settings](WriteBuffer & buf, const SerializationPtr & subcolumn_serialization, const IColumn & subcolumn, size_t pos)
{
subcolumn_serialization->serializeTextQuoted(subcolumn, pos, buf, settings);
if (settings.composed_data_type_output_format_mode == "spark")
subcolumn_serialization->serializeText(subcolumn, pos, buf, settings);
else
subcolumn_serialization->serializeTextQuoted(subcolumn, pos, buf, settings);
};
serializeTextImpl(column, row_num, ostr, writer, writer);
serializeTextImpl(column, row_num, settings, ostr, writer, writer);
}
void SerializationMap::deserializeText(IColumn & column, ReadBuffer & istr, const FormatSettings & settings, bool whole) const
@ -266,7 +286,7 @@ bool SerializationMap::tryDeserializeText(IColumn & column, ReadBuffer & istr, c
void SerializationMap::serializeTextJSON(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const
{
serializeTextImpl(column, row_num, ostr,
serializeTextImpl(column, row_num, settings, ostr,
[&settings](WriteBuffer & buf, const SerializationPtr & subcolumn_serialization, const IColumn & subcolumn, size_t pos)
{
/// We need to double-quote all keys (including integers) to produce valid JSON.

View File

@ -70,7 +70,7 @@ public:
private:
template <typename KeyWriter, typename ValueWriter>
void serializeTextImpl(const IColumn & column, size_t row_num, WriteBuffer & ostr, KeyWriter && key_writer, ValueWriter && value_writer) const;
void serializeTextImpl(const IColumn & column, size_t row_num, const FormatSettings & settings, WriteBuffer & ostr, KeyWriter && key_writer, ValueWriter && value_writer) const;
template <typename ReturnType = void, typename Reader>
ReturnType deserializeTextImpl(IColumn & column, ReadBuffer & istr, Reader && reader) const;

View File

@ -137,12 +137,25 @@ void SerializationTuple::deserializeBinary(IColumn & column, ReadBuffer & istr,
void SerializationTuple::serializeText(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const
{
writeChar('(', ostr);
for (size_t i = 0; i < elems.size(); ++i)
if (!elems.empty())
{
if (i != 0)
writeChar(',', ostr);
elems[i]->serializeTextQuoted(extractElementColumn(column, i), row_num, ostr, settings);
if (settings.composed_data_type_output_format_mode == "spark")
elems[0]->serializeText(extractElementColumn(column, 0), row_num, ostr, settings);
else
elems[0]->serializeTextQuoted(extractElementColumn(column, 0), row_num, ostr, settings);
}
if (settings.composed_data_type_output_format_mode == "spark")
for (size_t i = 1; i < elems.size(); ++i)
{
writeString(std::string_view(", "), ostr);
elems[i]->serializeText(extractElementColumn(column, i), row_num, ostr, settings);
}
else
for (size_t i = 1; i < elems.size(); ++i)
{
writeChar(',', ostr);
elems[i]->serializeTextQuoted(extractElementColumn(column, i), row_num, ostr, settings);
}
writeChar(')', ostr);
}

View File

@ -251,6 +251,7 @@ FormatSettings getFormatSettings(const ContextPtr & context, const Settings & se
format_settings.values.deduce_templates_of_expressions = settings[Setting::input_format_values_deduce_templates_of_expressions];
format_settings.values.interpret_expressions = settings[Setting::input_format_values_interpret_expressions];
format_settings.values.escape_quote_with_quote = settings[Setting::output_format_values_escape_quote_with_quote];
format_settings.composed_data_type_output_format_mode = settings[Setting::composed_data_type_output_format_mode];
format_settings.with_names_use_header = settings[Setting::input_format_with_names_use_header];
format_settings.with_types_use_header = settings[Setting::input_format_with_types_use_header];
format_settings.write_statistics = settings[Setting::output_format_write_statistics];

View File

@ -38,6 +38,7 @@ struct FormatSettings
bool try_infer_variant = false;
bool seekable_read = true;
String composed_data_type_output_format_mode = "default";
UInt64 max_rows_to_read_for_schema_inference = 25000;
UInt64 max_bytes_to_read_for_schema_inference = 32 * 1024 * 1024;

View File

@ -440,7 +440,7 @@ ColumnPtr Set::execute(const ColumnsWithTypeAndName & columns, bool negative) co
}
// If the original column is DateTime64, check for sub-second precision
if (isDateTime64(column_to_cast.column->getDataType()))
if (isDateTime64(column_to_cast.column->getDataType()) && !isDateTime64(removeNullable(result)->getDataType()))
{
processDateTime64Column(column_to_cast, result, null_map_holder, null_map);
}

View File

@ -1540,7 +1540,7 @@ std::pair<ASTPtr, BlockIO> executeQuery(
? getIdentifierName(ast_query_with_output->format)
: context->getDefaultFormat();
if (format_name == "Null")
if (boost::iequals(format_name, "Null"))
res.null_format = true;
}

View File

@ -142,7 +142,7 @@ size_t TokenInfo::getTotalSize() const
return size + parts.size() - 1;
}
#ifdef ABORT_ON_LOGICAL_ERROR
#ifdef DEBUG_OR_SANITIZER_BUILD
void CheckTokenTransform::transform(Chunk & chunk)
{
auto token_info = chunk.getChunkInfos().get<TokenInfo>();

View File

@ -93,7 +93,7 @@ namespace DeduplicationToken
};
#ifdef ABORT_ON_LOGICAL_ERROR
#ifdef DEBUG_OR_SANITIZER_BUILD
/// use that class only with debug builds in CI for introspection
class CheckTokenTransform : public ISimpleTransform
{

View File

@ -381,7 +381,7 @@ std::optional<Chain> generateViewChain(
table_prefers_large_blocks ? settings[Setting::min_insert_block_size_bytes] : 0ULL));
}
#ifdef ABORT_ON_LOGICAL_ERROR
#ifdef DEBUG_OR_SANITIZER_BUILD
out.addSource(std::make_shared<DeduplicationToken::CheckTokenTransform>("Before squashing", out.getInputHeader()));
#endif
@ -427,7 +427,7 @@ std::optional<Chain> generateViewChain(
if (type == QueryViewsLogElement::ViewType::MATERIALIZED)
{
#ifdef ABORT_ON_LOGICAL_ERROR
#ifdef DEBUG_OR_SANITIZER_BUILD
out.addSource(std::make_shared<DeduplicationToken::CheckTokenTransform>("Right after Inner query", out.getInputHeader()));
#endif
@ -450,7 +450,7 @@ std::optional<Chain> generateViewChain(
}
#ifdef ABORT_ON_LOGICAL_ERROR
#ifdef DEBUG_OR_SANITIZER_BUILD
out.addSource(std::make_shared<DeduplicationToken::CheckTokenTransform>("Right before Inner query", out.getInputHeader()));
#endif
}

View File

@ -364,7 +364,7 @@ void RefreshTask::refreshTask()
if (coordination.root_znode.last_attempt_replica == coordination.replica_name)
{
LOG_ERROR(log, "Znode {} indicates that this replica is running a refresh, but it isn't. Likely a bug.", coordination.path + "/running");
#ifdef ABORT_ON_LOGICAL_ERROR
#ifdef DEBUG_OR_SANITIZER_BUILD
abortOnFailedAssertion("Unexpected refresh lock in keeper");
#else
coordination.running_znode_exists = false;

View File

@ -88,7 +88,8 @@ void FutureMergedMutatedPart::assign(MergeTreeData::DataPartsVector parts_, Merg
void FutureMergedMutatedPart::updatePath(const MergeTreeData & storage, const IReservation * reservation)
{
path = storage.getFullPathOnDisk(reservation->getDisk()) + name + "/";
path = fs::path(storage.getFullPathOnDisk(reservation->getDisk())) / name;
path += "/";
}
}

View File

@ -387,7 +387,7 @@ IMergeTreeDataPart::IndexPtr IMergeTreeDataPart::getIndex() const
IMergeTreeDataPart::IndexPtr IMergeTreeDataPart::loadIndexToCache(PrimaryIndexCache & index_cache) const
{
auto key = PrimaryIndexCache::hash(getDataPartStorage().getFullPath());
auto key = PrimaryIndexCache::hash(getRelativePathOfActivePart());
auto callback = [this] { return loadIndex(); };
return index_cache.getOrSet(key, callback);
}
@ -398,7 +398,7 @@ void IMergeTreeDataPart::moveIndexToCache(PrimaryIndexCache & index_cache)
if (!index)
return;
auto key = PrimaryIndexCache::hash(getDataPartStorage().getFullPath());
auto key = PrimaryIndexCache::hash(getRelativePathOfActivePart());
index_cache.set(key, std::const_pointer_cast<Index>(index));
index.reset();
@ -406,6 +406,15 @@ void IMergeTreeDataPart::moveIndexToCache(PrimaryIndexCache & index_cache)
projection->moveIndexToCache(index_cache);
}
void IMergeTreeDataPart::removeIndexFromCache(PrimaryIndexCache * index_cache) const
{
if (!index_cache)
return;
auto key = PrimaryIndexCache::hash(getRelativePathOfActivePart());
index_cache->remove(key);
}
void IMergeTreeDataPart::setIndex(Columns index_columns)
{
std::scoped_lock lock(index_mutex);
@ -574,17 +583,49 @@ bool IMergeTreeDataPart::isMovingPart() const
return part_directory_path.parent_path().filename() == "moving";
}
void IMergeTreeDataPart::clearCaches()
{
if (cleared_data_in_caches.exchange(true) || is_duplicate)
return;
size_t uncompressed_bytes = getBytesUncompressedOnDisk();
/// Remove index and marks from cache if it was prewarmed to avoid threshing it with outdated data.
/// Do not remove in other cases to avoid extra contention on caches.
removeMarksFromCache(storage.getMarkCacheToPrewarm(uncompressed_bytes).get());
removeIndexFromCache(storage.getPrimaryIndexCacheToPrewarm(uncompressed_bytes).get());
}
bool IMergeTreeDataPart::mayStoreDataInCaches() const
{
size_t uncompressed_bytes = getBytesUncompressedOnDisk();
auto mark_cache = storage.getMarkCacheToPrewarm(uncompressed_bytes);
auto index_cache = storage.getPrimaryIndexCacheToPrewarm(uncompressed_bytes);
return (mark_cache || index_cache) && !cleared_data_in_caches;
}
void IMergeTreeDataPart::removeIfNeeded() noexcept
{
assert(assertHasValidVersionMetadata());
if (!is_temp && state != MergeTreeDataPartState::DeleteOnDestroy)
return;
std::string path;
try
{
path = getDataPartStorage().getRelativePath();
clearCaches();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__, fmt::format("while removing part {} with path {}", name, path));
}
if (!is_temp && state != MergeTreeDataPartState::DeleteOnDestroy)
return;
try
{
if (!getDataPartStorage().exists()) // path
return;
@ -2113,6 +2154,11 @@ std::optional<String> IMergeTreeDataPart::getRelativePathForDetachedPart(const S
return {};
}
String IMergeTreeDataPart::getRelativePathOfActivePart() const
{
return fs::path(getDataPartStorage().getFullRootPath()) / name / "";
}
void IMergeTreeDataPart::renameToDetached(const String & prefix)
{
auto path_to_detach = getRelativePathForDetachedPart(prefix, /* broken */ false);

View File

@ -1,5 +1,6 @@
#pragma once
#include <atomic>
#include <unordered_map>
#include <IO/WriteSettings.h>
#include <Core/Block.h>
@ -185,6 +186,15 @@ public:
/// Loads marks and saves them into mark cache for specified columns.
virtual void loadMarksToCache(const Names & column_names, MarkCache * mark_cache) const = 0;
/// Removes marks from cache for all columns in part.
virtual void removeMarksFromCache(MarkCache * mark_cache) const = 0;
/// Removes data related to data part from mark and primary index caches.
void clearCaches();
/// Returns true if data related to data part may be stored in mark and primary index caches.
bool mayStoreDataInCaches() const;
String getMarksFileExtension() const { return index_granularity_info.mark_type.getFileExtension(); }
/// Generate the new name for this part according to `new_part_info` and min/max dates from the old name.
@ -376,6 +386,7 @@ public:
IndexPtr getIndex() const;
IndexPtr loadIndexToCache(PrimaryIndexCache & index_cache) const;
void moveIndexToCache(PrimaryIndexCache & index_cache);
void removeIndexFromCache(PrimaryIndexCache * index_cache) const;
void setIndex(Columns index_columns);
void unloadIndex();
@ -436,6 +447,10 @@ public:
std::optional<String> getRelativePathForPrefix(const String & prefix, bool detached = false, bool broken = false) const;
/// This method ignores current tmp prefix of part and returns
/// the name of part when it was or will be in Active state.
String getRelativePathOfActivePart() const;
bool isProjectionPart() const { return parent_part != nullptr; }
/// Check if the part is in the `/moving` directory
@ -757,6 +772,9 @@ private:
/// This ugly flag is needed for debug assertions only
mutable bool part_is_probably_removed_from_disk = false;
/// If it's true then data related to this part is cleared from mark and index caches.
mutable std::atomic_bool cleared_data_in_caches = false;
};
using MergeTreeDataPartPtr = std::shared_ptr<const IMergeTreeDataPart>;

View File

@ -120,10 +120,4 @@ NameSet IMergedBlockOutputStream::removeEmptyColumnsFromPart(
return remove_files;
}
PlainMarksByName IMergedBlockOutputStream::releaseCachedMarks()
{
if (!writer)
return {};
return writer->releaseCachedMarks();
}
}

View File

@ -35,7 +35,10 @@ public:
return writer->getIndexGranularity();
}
PlainMarksByName releaseCachedMarks();
PlainMarksByName releaseCachedMarks()
{
return writer ? writer->releaseCachedMarks() : PlainMarksByName{};
}
size_t getNumberOfOpenStreams() const
{
@ -43,7 +46,6 @@ public:
}
protected:
/// Remove all columns marked expired in data_part. Also, clears checksums
/// and columns array. Return set of removed files names.
NameSet removeEmptyColumnsFromPart(

View File

@ -445,12 +445,14 @@ bool MergeFromLogEntryTask::finalize(ReplicatedMergeMutateTaskBase::PartLogWrite
finish_callback = [storage_ptr = &storage]() { storage_ptr->merge_selecting_task->schedule(); };
ProfileEvents::increment(ProfileEvents::ReplicatedPartMerges);
if (auto mark_cache = storage.getMarkCacheToPrewarm())
size_t bytes_uncompressed = part->getBytesUncompressedOnDisk();
if (auto mark_cache = storage.getMarkCacheToPrewarm(bytes_uncompressed))
addMarksToCache(*part, cached_marks, mark_cache.get());
/// Move index to cache and reset it here because we need
/// a correct part name after rename for a key of cache entry.
if (auto index_cache = storage.getPrimaryIndexCacheToPrewarm())
if (auto index_cache = storage.getPrimaryIndexCacheToPrewarm(bytes_uncompressed))
part->moveIndexToCache(*index_cache);
write_part_log({});

View File

@ -152,13 +152,15 @@ void MergePlainMergeTreeTask::finish()
ThreadFuzzer::maybeInjectSleep();
ThreadFuzzer::maybeInjectMemoryLimitException();
if (auto mark_cache = storage.getMarkCacheToPrewarm())
size_t bytes_uncompressed = new_part->getBytesUncompressedOnDisk();
if (auto mark_cache = storage.getMarkCacheToPrewarm(bytes_uncompressed))
{
auto marks = merge_task->releaseCachedMarks();
addMarksToCache(*new_part, marks, mark_cache.get());
}
if (auto index_cache = storage.getPrimaryIndexCacheToPrewarm())
if (auto index_cache = storage.getPrimaryIndexCacheToPrewarm(bytes_uncompressed))
{
/// Move index to cache and reset it here because we need
/// a correct part name after rename for a key of cache entry.

View File

@ -578,6 +578,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare() const
ctx->compression_codec,
std::move(index_granularity_ptr),
global_ctx->txn ? global_ctx->txn->tid : Tx::PrehistoricTID,
global_ctx->merge_list_element_ptr->total_size_bytes_compressed,
/*reset_columns=*/ true,
ctx->blocks_are_granules_size,
global_ctx->context->getWriteSettings());
@ -1125,6 +1126,7 @@ void MergeTask::VerticalMergeStage::prepareVerticalMergeForOneColumn() const
getStatisticsForColumns(columns_list, global_ctx->metadata_snapshot),
ctx->compression_codec,
global_ctx->to->getIndexGranularity(),
global_ctx->merge_list_element_ptr->total_size_bytes_uncompressed,
&global_ctx->written_offset_columns);
ctx->column_elems_written = 0;

View File

@ -238,6 +238,7 @@ namespace MergeTreeSetting
extern const MergeTreeSettingsBool prewarm_mark_cache;
extern const MergeTreeSettingsBool primary_key_lazy_load;
extern const MergeTreeSettingsBool enforce_index_structure_match_on_partition_manipulation;
extern const MergeTreeSettingsUInt64 min_bytes_to_prewarm_caches;
}
namespace ServerSetting
@ -2366,19 +2367,31 @@ PrimaryIndexCachePtr MergeTreeData::getPrimaryIndexCache() const
return getContext()->getPrimaryIndexCache();
}
PrimaryIndexCachePtr MergeTreeData::getPrimaryIndexCacheToPrewarm() const
PrimaryIndexCachePtr MergeTreeData::getPrimaryIndexCacheToPrewarm(size_t part_uncompressed_bytes) const
{
if (!(*getSettings())[MergeTreeSetting::prewarm_primary_key_cache])
return nullptr;
/// Do not load data to caches for small parts because
/// they will be likely replaced by merge immediately.
size_t min_bytes_to_prewarm = (*getSettings())[MergeTreeSetting::min_bytes_to_prewarm_caches];
if (part_uncompressed_bytes && part_uncompressed_bytes < min_bytes_to_prewarm)
return nullptr;
return getPrimaryIndexCache();
}
MarkCachePtr MergeTreeData::getMarkCacheToPrewarm() const
MarkCachePtr MergeTreeData::getMarkCacheToPrewarm(size_t part_uncompressed_bytes) const
{
if (!(*getSettings())[MergeTreeSetting::prewarm_mark_cache])
return nullptr;
/// Do not load data to caches for small parts because
/// they will be likely replaced by merge immediately.
size_t min_bytes_to_prewarm = (*getSettings())[MergeTreeSetting::min_bytes_to_prewarm_caches];
if (part_uncompressed_bytes && part_uncompressed_bytes < min_bytes_to_prewarm)
return nullptr;
return getContext()->getMarkCache();
}
@ -9035,7 +9048,8 @@ std::pair<MergeTreeData::MutableDataPartPtr, scope_guard> MergeTreeData::createE
ColumnsStatistics{},
compression_codec,
std::make_shared<MergeTreeIndexGranularityAdaptive>(),
txn ? txn->tid : Tx::PrehistoricTID);
txn ? txn->tid : Tx::PrehistoricTID,
/*part_uncompressed_bytes=*/ 0);
bool sync_on_insert = (*settings)[MergeTreeSetting::fsync_after_insert];
@ -9108,14 +9122,14 @@ void MergeTreeData::unloadPrimaryKeys()
}
}
size_t MergeTreeData::unloadPrimaryKeysOfOutdatedParts()
size_t MergeTreeData::unloadPrimaryKeysAndClearCachesOfOutdatedParts()
{
/// If the method is already called from another thread, then we don't need to do anything.
std::unique_lock lock(unload_primary_key_mutex, std::defer_lock);
if (!lock.try_lock())
return 0;
DataPartsVector parts_to_unload_index;
DataPartsVector parts_to_clear;
{
auto parts_lock = lockParts();
@ -9126,18 +9140,22 @@ size_t MergeTreeData::unloadPrimaryKeysOfOutdatedParts()
/// Outdated part may be hold by SELECT query and still needs the index.
/// This check requires lock of index_mutex but if outdated part is unique then there is no
/// contention on it, so it's relatively cheap and it's ok to check under a global parts lock.
if (isSharedPtrUnique(part) && part->isIndexLoaded())
parts_to_unload_index.push_back(part);
if (isSharedPtrUnique(part) && (part->isIndexLoaded() || part->mayStoreDataInCaches()))
parts_to_clear.push_back(part);
}
}
for (const auto & part : parts_to_unload_index)
for (const auto & part : parts_to_clear)
{
const_cast<IMergeTreeDataPart &>(*part).unloadIndex();
auto & part_mut = const_cast<IMergeTreeDataPart &>(*part);
part_mut.unloadIndex();
part_mut.clearCaches();
LOG_TEST(log, "Unloaded primary key for outdated part {}", part->name);
}
return parts_to_unload_index.size();
return parts_to_clear.size();
}
void MergeTreeData::verifySortingKey(const KeyDescription & sorting_key)

View File

@ -511,9 +511,9 @@ public:
/// Returns a pointer to primary index cache if it is enabled.
PrimaryIndexCachePtr getPrimaryIndexCache() const;
/// Returns a pointer to primary index cache if it is enabled and required to be prewarmed.
PrimaryIndexCachePtr getPrimaryIndexCacheToPrewarm() const;
PrimaryIndexCachePtr getPrimaryIndexCacheToPrewarm(size_t part_uncompressed_bytes) const;
/// Returns a pointer to primary mark cache if it is required to be prewarmed.
MarkCachePtr getMarkCacheToPrewarm() const;
MarkCachePtr getMarkCacheToPrewarm(size_t part_uncompressed_bytes) const;
/// Prewarm mark cache and primary index cache for the most recent data parts.
void prewarmCaches(ThreadPool & pool, MarkCachePtr mark_cache, PrimaryIndexCachePtr index_cache);
@ -1166,7 +1166,7 @@ public:
/// Unloads primary keys of outdated parts that are not used by any query.
/// Returns the number of parts for which index was unloaded.
size_t unloadPrimaryKeysOfOutdatedParts();
size_t unloadPrimaryKeysAndClearCachesOfOutdatedParts();
protected:
friend class IMergeTreeDataPart;
@ -1335,7 +1335,7 @@ protected:
std::mutex grab_old_parts_mutex;
/// The same for clearOldTemporaryDirectories.
std::mutex clear_old_temporary_directories_mutex;
/// The same for unloadPrimaryKeysOfOutdatedParts.
/// The same for unloadPrimaryKeysAndClearCachesOfOutdatedParts.
std::mutex unload_primary_key_mutex;
void checkProperties(

View File

@ -175,6 +175,16 @@ void MergeTreeDataPartCompact::loadMarksToCache(const Names & column_names, Mark
loader.loadMarks();
}
void MergeTreeDataPartCompact::removeMarksFromCache(MarkCache * mark_cache) const
{
if (!mark_cache)
return;
auto mark_path = index_granularity_info.getMarksFilePath(DATA_FILE_NAME);
auto key = MarkCache::hash(fs::path(getRelativePathOfActivePart()) / mark_path);
mark_cache->remove(key);
}
bool MergeTreeDataPartCompact::hasColumnFiles(const NameAndTypePair & column) const
{
if (!getColumnPosition(column.getNameInStorage()))

View File

@ -55,6 +55,7 @@ public:
std::optional<String> getFileNameForColumn(const NameAndTypePair & /* column */) const override { return DATA_FILE_NAME; }
void loadMarksToCache(const Names & column_names, MarkCache * mark_cache) const override;
void removeMarksFromCache(MarkCache * mark_cache) const override;
~MergeTreeDataPartCompact() override;

View File

@ -244,6 +244,27 @@ void MergeTreeDataPartWide::loadMarksToCache(const Names & column_names, MarkCac
loader->loadMarks();
}
void MergeTreeDataPartWide::removeMarksFromCache(MarkCache * mark_cache) const
{
if (!mark_cache)
return;
const auto & serializations = getSerializations();
for (const auto & [column_name, serialization] : serializations)
{
serialization->enumerateStreams([&](const auto & subpath)
{
auto stream_name = getStreamNameForColumn(column_name, subpath, checksums);
if (!stream_name)
return;
auto mark_path = index_granularity_info.getMarksFilePath(*stream_name);
auto key = MarkCache::hash(fs::path(getRelativePathOfActivePart()) / mark_path);
mark_cache->remove(key);
});
}
}
bool MergeTreeDataPartWide::isStoredOnRemoteDisk() const
{
return getDataPartStorage().isStoredOnRemoteDisk();

View File

@ -52,6 +52,7 @@ public:
std::optional<time_t> getColumnModificationTime(const String & column_name) const override;
void loadMarksToCache(const Names & column_names, MarkCache * mark_cache) const override;
void removeMarksFromCache(MarkCache * mark_cache) const override;
protected:
static void loadIndexGranularityImpl(

View File

@ -225,12 +225,13 @@ void MergeTreeDataWriter::TemporaryPart::finalize()
projection->getDataPartStorage().precommitTransaction();
}
/// This method must be called after rename and commit of part
/// because a correct path is required for the keys of caches.
void MergeTreeDataWriter::TemporaryPart::prewarmCaches()
{
/// This method must be called after rename and commit of part
/// because a correct path is required for the keys of caches.
size_t bytes_uncompressed = part->getBytesUncompressedOnDisk();
if (auto mark_cache = part->storage.getMarkCacheToPrewarm())
if (auto mark_cache = part->storage.getMarkCacheToPrewarm(bytes_uncompressed))
{
for (const auto & stream : streams)
{
@ -239,7 +240,7 @@ void MergeTreeDataWriter::TemporaryPart::prewarmCaches()
}
}
if (auto index_cache = part->storage.getPrimaryIndexCacheToPrewarm())
if (auto index_cache = part->storage.getPrimaryIndexCacheToPrewarm(bytes_uncompressed))
{
/// Index was already set during writing. Now move it to cache.
part->moveIndexToCache(*index_cache);
@ -726,6 +727,7 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeTempPartImpl(
compression_codec,
std::move(index_granularity_ptr),
context->getCurrentTransaction() ? context->getCurrentTransaction()->tid : Tx::PrehistoricTID,
block.bytes(),
/*reset_columns=*/ false,
/*blocks_are_granules_size=*/ false,
context->getWriteSettings());
@ -880,6 +882,7 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeProjectionPartImpl(
compression_codec,
std::move(index_granularity_ptr),
Tx::PrehistoricTID,
block.bytes(),
/*reset_columns=*/ false,
/*blocks_are_granules_size=*/ false,
data.getContext()->getWriteSettings());

View File

@ -16,6 +16,7 @@ namespace ProfileEvents
{
extern const Event WaitMarksLoadMicroseconds;
extern const Event BackgroundLoadingMarksTasks;
extern const Event LoadedMarksFiles;
extern const Event LoadedMarksCount;
extern const Event LoadedMarksMemoryBytes;
}
@ -203,6 +204,7 @@ MarkCache::MappedPtr MergeTreeMarksLoader::loadMarksImpl()
auto res = std::make_shared<MarksInCompressedFile>(plain_marks);
ProfileEvents::increment(ProfileEvents::LoadedMarksFiles);
ProfileEvents::increment(ProfileEvents::LoadedMarksCount, marks_count * num_columns_in_mark);
ProfileEvents::increment(ProfileEvents::LoadedMarksMemoryBytes, res->approximateMemoryUsage());
@ -264,7 +266,7 @@ void addMarksToCache(const IMergeTreeDataPart & part, const PlainMarksByName & c
for (const auto & [stream_name, marks] : cached_marks)
{
auto mark_path = part.index_granularity_info.getMarksFilePath(stream_name);
auto key = MarkCache::hash(fs::path(part.getDataPartStorage().getFullPath()) / mark_path);
auto key = MarkCache::hash(fs::path(part.getRelativePathOfActivePart()) / mark_path);
mark_cache->set(key, std::make_shared<MarksInCompressedFile>(*marks));
}
}

View File

@ -83,6 +83,9 @@ struct MergeTreeSettings;
/// Adds computed marks for part to the marks cache.
void addMarksToCache(const IMergeTreeDataPart & part, const PlainMarksByName & cached_marks, MarkCache * mark_cache);
/// Removes cached marks for all columns from part.
void removeMarksFromCache(const IMergeTreeDataPart & part, MarkCache * mark_cache);
/// Returns the list of columns suitable for prewarming of mark cache according to settings.
Names getColumnsToPrewarmMarks(const MergeTreeSettings & settings, const NamesAndTypesList & columns_list);

View File

@ -244,6 +244,7 @@ namespace ErrorCodes
DECLARE(Bool, prewarm_primary_key_cache, false, "If true primary index cache will be prewarmed by saving marks to mark cache on inserts, merges, fetches and on startup of server", 0) \
DECLARE(Bool, prewarm_mark_cache, false, "If true mark cache will be prewarmed by saving marks to mark cache on inserts, merges, fetches and on startup of server", 0) \
DECLARE(String, columns_to_prewarm_mark_cache, "", "List of columns to prewarm mark cache for (if enabled). Empty means all columns", 0) \
DECLARE(UInt64, min_bytes_to_prewarm_caches, 0, "Minimal size (uncomressed bytes) to prewarm mark cache and primary index cache for new parts", 0) \
/** Projection settings. */ \
DECLARE(UInt64, max_projections, 25, "The maximum number of merge tree projections.", 0) \
DECLARE(LightweightMutationProjectionMode, lightweight_mutation_projection_mode, LightweightMutationProjectionMode::THROW, "When lightweight delete happens on a table with projection(s), the possible operations include throw the exception as projection exists, or drop projections of this table's relevant parts, or rebuild the projections.", 0) \

View File

@ -29,6 +29,7 @@ MergedBlockOutputStream::MergedBlockOutputStream(
CompressionCodecPtr default_codec_,
MergeTreeIndexGranularityPtr index_granularity_ptr,
TransactionID tid,
size_t part_uncompressed_bytes,
bool reset_columns_,
bool blocks_are_granules_size,
const WriteSettings & write_settings_)
@ -38,9 +39,9 @@ MergedBlockOutputStream::MergedBlockOutputStream(
, write_settings(write_settings_)
{
/// Save marks in memory if prewarm is enabled to avoid re-reading marks file.
bool save_marks_in_cache = data_part->storage.getMarkCacheToPrewarm() != nullptr;
bool save_marks_in_cache = data_part->storage.getMarkCacheToPrewarm(part_uncompressed_bytes) != nullptr;
/// Save primary index in memory if cache is disabled or is enabled with prewarm to avoid re-reading primary index file.
bool save_primary_index_in_memory = !data_part->storage.getPrimaryIndexCache() || data_part->storage.getPrimaryIndexCacheToPrewarm();
bool save_primary_index_in_memory = !data_part->storage.getPrimaryIndexCache() || data_part->storage.getPrimaryIndexCacheToPrewarm(part_uncompressed_bytes);
MergeTreeWriterSettings writer_settings(
data_part->storage.getContext()->getSettingsRef(),

View File

@ -24,6 +24,7 @@ public:
CompressionCodecPtr default_codec_,
MergeTreeIndexGranularityPtr index_granularity_ptr,
TransactionID tid,
size_t part_uncompressed_bytes,
bool reset_columns_ = false,
bool blocks_are_granules_size = false,
const WriteSettings & write_settings = {});

View File

@ -21,13 +21,14 @@ MergedColumnOnlyOutputStream::MergedColumnOnlyOutputStream(
const ColumnsStatistics & stats_to_recalc,
CompressionCodecPtr default_codec,
MergeTreeIndexGranularityPtr index_granularity_ptr,
size_t part_uncompressed_bytes,
WrittenOffsetColumns * offset_columns)
: IMergedBlockOutputStream(data_part->storage.getSettings(), data_part->getDataPartStoragePtr(), metadata_snapshot_, columns_list_, /*reset_columns=*/ true)
{
/// Save marks in memory if prewarm is enabled to avoid re-reading marks file.
bool save_marks_in_cache = data_part->storage.getMarkCacheToPrewarm() != nullptr;
bool save_marks_in_cache = data_part->storage.getMarkCacheToPrewarm(part_uncompressed_bytes) != nullptr;
/// Save primary index in memory if cache is disabled or is enabled with prewarm to avoid re-reading priamry index file.
bool save_primary_index_in_memory = !data_part->storage.getPrimaryIndexCache() || data_part->storage.getPrimaryIndexCacheToPrewarm();
bool save_primary_index_in_memory = !data_part->storage.getPrimaryIndexCache() || data_part->storage.getPrimaryIndexCacheToPrewarm(part_uncompressed_bytes);
/// Granularity is never recomputed while writing only columns.
MergeTreeWriterSettings writer_settings(

View File

@ -22,6 +22,7 @@ public:
const ColumnsStatistics & stats_to_recalc,
CompressionCodecPtr default_codec,
MergeTreeIndexGranularityPtr index_granularity_ptr,
size_t part_uncompressed_bytes,
WrittenOffsetColumns * offset_columns = nullptr);
void write(const Block & block) override;

View File

@ -1625,8 +1625,8 @@ private:
else
{
index_granularity_ptr = createMergeTreeIndexGranularity(
ctx->new_data_part->rows_count,
ctx->new_data_part->getBytesUncompressedOnDisk(),
ctx->source_part->rows_count,
ctx->source_part->getBytesUncompressedOnDisk(),
*ctx->data->getSettings(),
ctx->new_data_part->index_granularity_info,
/*blocks_are_granules=*/ false);
@ -1641,6 +1641,7 @@ private:
ctx->compression_codec,
std::move(index_granularity_ptr),
ctx->txn ? ctx->txn->tid : Tx::PrehistoricTID,
ctx->source_part->getBytesUncompressedOnDisk(),
/*reset_columns=*/ true,
/*blocks_are_granules_size=*/ false,
ctx->context->getWriteSettings());
@ -1876,7 +1877,8 @@ private:
std::vector<MergeTreeIndexPtr>(ctx->indices_to_recalc.begin(), ctx->indices_to_recalc.end()),
ColumnsStatistics(ctx->stats_to_recalc.begin(), ctx->stats_to_recalc.end()),
ctx->compression_codec,
ctx->source_part->index_granularity);
ctx->source_part->index_granularity,
ctx->source_part->getBytesUncompressedOnDisk());
ctx->mutating_pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder));
ctx->mutating_pipeline.setProgressCallback(ctx->progress_callback);

View File

@ -193,7 +193,7 @@ Float32 ReplicatedMergeTreeCleanupThread::iterate()
cleaned_part_like += storage.clearEmptyParts();
}
cleaned_part_like += storage.unloadPrimaryKeysOfOutdatedParts();
cleaned_part_like += storage.unloadPrimaryKeysAndClearCachesOfOutdatedParts();
/// We need to measure the number of removed objects somehow (for better scheduling),
/// but just summing the number of removed async blocks, logs, and empty parts does not make any sense.

View File

@ -130,7 +130,6 @@ private:
std::unique_ptr<DelayedChunk> delayed_chunk;
void finishDelayedChunk(const ZooKeeperWithFaultInjectionPtr & zookeeper);
void prewarmCaches(const MergeTreeDataWriter::TemporaryPart & temp_part) const;
};
using ReplicatedMergeTreeSinkWithAsyncDeduplicate = ReplicatedMergeTreeSinkImpl<true>;

View File

@ -209,7 +209,7 @@ struct DeltaLakeMetadataImpl
if (!object)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Failed to parse metadata file");
#ifdef ABORT_ON_LOGICAL_ERROR
#ifdef DEBUG_OR_SANITIZER_BUILD
std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
object->stringify(oss);
LOG_TEST(log, "Metadata: {}", oss.str());

View File

@ -158,8 +158,8 @@ StorageMergeTree::StorageMergeTree(
prewarmCaches(
getActivePartsLoadingThreadPool().get(),
getMarkCacheToPrewarm(),
getPrimaryIndexCacheToPrewarm());
getMarkCacheToPrewarm(0),
getPrimaryIndexCacheToPrewarm(0));
}
@ -1522,7 +1522,7 @@ bool StorageMergeTree::scheduleDataProcessingJob(BackgroundJobsAssignee & assign
cleared_count += clearOldPartsFromFilesystem();
cleared_count += clearOldMutations();
cleared_count += clearEmptyParts();
cleared_count += unloadPrimaryKeysOfOutdatedParts();
cleared_count += unloadPrimaryKeysAndClearCachesOfOutdatedParts();
return cleared_count;
/// TODO maybe take into account number of cleared objects when calculating backoff
}, common_assignee_trigger, getStorageID()), /* need_trigger */ false);

View File

@ -515,8 +515,8 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
prewarmCaches(
getActivePartsLoadingThreadPool().get(),
getMarkCacheToPrewarm(),
getPrimaryIndexCacheToPrewarm());
getMarkCacheToPrewarm(0),
getPrimaryIndexCacheToPrewarm(0));
if (LoadingStrictnessLevel::ATTACH <= mode)
{
@ -5089,13 +5089,15 @@ bool StorageReplicatedMergeTree::fetchPart(
ProfileEvents::increment(ProfileEvents::ObsoleteReplicatedParts);
}
if (auto mark_cache = getMarkCacheToPrewarm())
size_t bytes_uncompressed = part->getBytesUncompressedOnDisk();
if (auto mark_cache = getMarkCacheToPrewarm(bytes_uncompressed))
{
auto column_names = getColumnsToPrewarmMarks(*getSettings(), part->getColumns());
part->loadMarksToCache(column_names, mark_cache.get());
}
if (auto index_cache = getPrimaryIndexCacheToPrewarm())
if (auto index_cache = getPrimaryIndexCacheToPrewarm(bytes_uncompressed))
{
part->loadIndexToCache(*index_cache);
}

View File

@ -1596,7 +1596,7 @@ void StorageWindowView::writeIntoWindowView(
return std::make_shared<DeduplicationToken::SetViewBlockNumberTransform>(stream_header);
});
#ifdef ABORT_ON_LOGICAL_ERROR
#ifdef DEBUG_OR_SANITIZER_BUILD
builder.addSimpleTransform([&](const Block & stream_header)
{
return std::make_shared<DeduplicationToken::CheckTokenTransform>("StorageWindowView: Afrer tmp table before squashing", stream_header);
@ -1643,7 +1643,7 @@ void StorageWindowView::writeIntoWindowView(
lateness_upper_bound);
});
#ifdef ABORT_ON_LOGICAL_ERROR
#ifdef DEBUG_OR_SANITIZER_BUILD
builder.addSimpleTransform([&](const Block & stream_header)
{
return std::make_shared<DeduplicationToken::CheckTokenTransform>("StorageWindowView: Afrer WatermarkTransform", stream_header);
@ -1668,7 +1668,7 @@ void StorageWindowView::writeIntoWindowView(
builder.addSimpleTransform([&](const Block & header_) { return std::make_shared<ExpressionTransform>(header_, convert_actions); });
}
#ifdef ABORT_ON_LOGICAL_ERROR
#ifdef DEBUG_OR_SANITIZER_BUILD
builder.addSimpleTransform([&](const Block & stream_header)
{
return std::make_shared<DeduplicationToken::CheckTokenTransform>("StorageWindowView: Before out", stream_header);

View File

@ -191,8 +191,9 @@ def test_invalid_snapshot(started_cluster):
]
)
node.start_clickhouse(start_wait_sec=120, expected_to_fail=True)
assert node.contains_in_log("Failure to load from latest snapshot with index")
assert node.contains_in_log(
"Aborting because of failure to load from latest snapshot with index"
"Manual intervention is necessary for recovery. Problematic snapshot can be removed but it will lead to data loss"
)
node.stop_clickhouse()

View File

@ -0,0 +1,16 @@
-- array format --
[\'1\']
[1, 2, abc, \'1\']
[1, 2, abc, \'1\']
[1, 2, abc, \'1\']
[1, 2, abc, \'1\']
-- map format --
{1343 -> fe, afe -> fefe}
{1343 -> fe, afe -> fefe}
{1343 -> fe, afe -> fefe}
{1343 -> fe, afe -> fefe}
-- tuple format --
(1, 3, abc)
(1, 3, abc)
(1, 3, abc)
(1, 3, abc)

View File

@ -0,0 +1,18 @@
SELECT '-- array format --';
SELECT CAST(array('\'1\'') , 'String') SETTINGS composed_data_type_output_format_mode = 'spark';
SELECT CAST([materialize('1'), '2', 'abc', '\'1\''], 'String') SETTINGS composed_data_type_output_format_mode = 'spark';
SELECT CAST([materialize('1'), materialize('2'), 'abc', '\'1\''], 'String') SETTINGS composed_data_type_output_format_mode = 'spark';
SELECT CAST([materialize('1'), materialize('2'), materialize('abc'), '\'1\''], 'String') SETTINGS composed_data_type_output_format_mode = 'spark';
SELECT CAST([materialize('1'), materialize('2'), materialize('abc'), materialize('\'1\'')], 'String') SETTINGS composed_data_type_output_format_mode = 'spark';
SELECT '-- map format --';
SELECT toString(map('1343', 'fe', 'afe', 'fefe')) SETTINGS composed_data_type_output_format_mode = 'spark';
SELECT toString(map(materialize('1343'), materialize('fe'), 'afe', 'fefe')) SETTINGS composed_data_type_output_format_mode = 'spark';
SELECT toString(map(materialize('1343'), materialize('fe'), materialize('afe'), 'fefe')) SETTINGS composed_data_type_output_format_mode = 'spark';
SELECT toString(map(materialize('1343'), materialize('fe'), materialize('afe'), materialize('fefe'))) SETTINGS composed_data_type_output_format_mode = 'spark';
SELECT '-- tuple format --';
SELECT toString(('1', '3', 'abc')) SETTINGS composed_data_type_output_format_mode = 'spark';
SELECT toString((materialize('1'), '3', 'abc')) SETTINGS composed_data_type_output_format_mode = 'spark';
SELECT toString((materialize('1'), materialize('3'), 'abc')) SETTINGS composed_data_type_output_format_mode = 'spark';
SELECT toString((materialize('1'), materialize('3'), materialize('abc'))) SETTINGS composed_data_type_output_format_mode = 'spark';

View File

@ -0,0 +1,17 @@
#!/usr/bin/env bash
set -e
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
query_id="03276_null_format_matching_case_insensitive_$RANDOM$RANDOM"
$CLICKHOUSE_CLIENT --query_id "$query_id" -q "select * from numbers_mt(1e8) format null settings max_rows_to_read=0"
$CLICKHOUSE_CLIENT -q "
SYSTEM FLUSH LOGS;
-- SendBytes should be close to 0, previously for this query it was around 800MB
select ProfileEvents['NetworkSendBytes'] < 1e6 from system.query_log where current_database = currentDatabase() and event_date >= yesterday() and query_id = '$query_id' and type = 'QueryFinish';
"

View File

@ -0,0 +1,2 @@
42
1 1

View File

@ -0,0 +1,9 @@
SELECT 42 SETTINGS log_comment='03277_logging_elapsed_ns';
SYSTEM FLUSH LOGS;
SELECT
ProfileEvents['LogDebug'] + ProfileEvents['LogTrace'] > 0,
ProfileEvents['LoggerElapsedNanoseconds'] > 0
FROM system.query_log
WHERE current_database = currentDatabase() AND log_comment = '03277_logging_elapsed_ns' AND type = 'QueryFinish';

View File

@ -0,0 +1,12 @@
MarkCacheFiles 3
PrimaryIndexCacheFiles 1
550
MarkCacheFiles 5
PrimaryIndexCacheFiles 2
550
MarkCacheFiles 3
PrimaryIndexCacheFiles 1
MarkCacheFiles 0
PrimaryIndexCacheFiles 0
2 1
0 0

View File

@ -0,0 +1,76 @@
#!/usr/bin/env bash
# Tags: no-parallel, no-random-merge-tree-settings
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
$CLICKHOUSE_CLIENT --query "
DROP TABLE IF EXISTS t_prewarm_cache_rmt_1;
CREATE TABLE t_prewarm_cache_rmt_1 (a UInt64, b UInt64, c UInt64)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{database}/03277_prewarms_caches/t_prewarm_cache', '1')
ORDER BY a
SETTINGS
index_granularity = 100,
min_bytes_for_wide_part = 0,
use_primary_key_cache = 1,
prewarm_primary_key_cache = 1,
prewarm_mark_cache = 1,
max_cleanup_delay_period = 1,
cleanup_delay_period = 1,
min_bytes_to_prewarm_caches = 30000;
SYSTEM DROP MARK CACHE;
SYSTEM DROP PRIMARY INDEX CACHE;
INSERT INTO t_prewarm_cache_rmt_1 SELECT number, rand(), rand() FROM numbers(100, 100);
INSERT INTO t_prewarm_cache_rmt_1 SELECT number, rand(), rand() FROM numbers(1000, 2000);
SYSTEM RELOAD ASYNCHRONOUS METRICS;
SELECT metric, value FROM system.asynchronous_metrics WHERE metric IN ('PrimaryIndexCacheFiles', 'MarkCacheFiles') ORDER BY metric;
SELECT count() FROM t_prewarm_cache_rmt_1 WHERE a % 2 = 0 AND a >= 100 AND a < 2000 AND NOT ignore(a, b);
SYSTEM RELOAD ASYNCHRONOUS METRICS;
SELECT metric, value FROM system.asynchronous_metrics WHERE metric IN ('PrimaryIndexCacheFiles', 'MarkCacheFiles') ORDER BY metric;
SYSTEM DROP MARK CACHE;
SYSTEM DROP PRIMARY INDEX CACHE;
OPTIMIZE TABLE t_prewarm_cache_rmt_1 FINAL;
SELECT count() FROM t_prewarm_cache_rmt_1 WHERE a % 2 = 0 AND a >= 100 AND a < 2000 AND NOT ignore(a, b);
SYSTEM RELOAD ASYNCHRONOUS METRICS;
SELECT metric, value FROM system.asynchronous_metrics WHERE metric IN ('PrimaryIndexCacheFiles', 'MarkCacheFiles') ORDER BY metric;
TRUNCATE TABLE t_prewarm_cache_rmt_1;
"
for _ in {1..100}; do
res=$($CLICKHOUSE_CLIENT -q "
SYSTEM RELOAD ASYNCHRONOUS METRICS;
SELECT value FROM system.asynchronous_metrics WHERE metric = 'PrimaryIndexCacheFiles';
")
if [[ $res -eq 0 ]]; then
break
fi
sleep 0.3
done
$CLICKHOUSE_CLIENT --query "
SYSTEM RELOAD ASYNCHRONOUS METRICS;
SELECT metric, value FROM system.asynchronous_metrics WHERE metric IN ('PrimaryIndexCacheFiles', 'MarkCacheFiles') ORDER BY metric;
SYSTEM FLUSH LOGS;
SELECT
ProfileEvents['LoadedMarksFiles'],
ProfileEvents['LoadedPrimaryIndexFiles']
FROM system.query_log
WHERE current_database = currentDatabase() AND type = 'QueryFinish' AND query LIKE 'SELECT count() FROM t_prewarm_cache_rmt_1%'
ORDER BY event_time_microseconds;
DROP TABLE IF EXISTS t_prewarm_cache_rmt_1;
"

View File

@ -0,0 +1,2 @@
1 2001-01-11 01:11:21.100 2001-01-11 01:11:21.100
1 2001-01-11 01:11:21.100 2001-01-11 01:11:21.100

View File

@ -0,0 +1,9 @@
CREATE TABLE datetime64_issue (id int, dt DateTime64(3), dtn Nullable(DateTime64(3))) ENGINE = MergeTree() ORDER BY id PRIMARY KEY id;
INSERT INTO datetime64_issue(id, dt, dtn) VALUES (1, toDateTime64('2001-01-11 01:11:21.100', 3), toDateTime64('2001-01-11 01:11:21.100', 3));
SELECT * FROM datetime64_issue WHERE dt in (toDateTime64('2001-01-11 01:11:21.100', 3));
SELECT * FROM datetime64_issue WHERE dtn in (toDateTime64('2001-01-11 01:11:21.100', 3));
DROP TABLE datetime64_issue;

View File

@ -69,6 +69,7 @@ EXTERN_TYPES_EXCLUDES=(
ProfileEvents::end
ProfileEvents::increment
ProfileEvents::incrementForLogMessage
ProfileEvents::incrementLoggerElapsedNanoseconds
ProfileEvents::getName
ProfileEvents::Timer
ProfileEvents::Type