Merge branch 'master' into enable-coverage-for-debug-build

This commit is contained in:
Alexey Milovidov 2024-01-17 22:35:51 +01:00
commit c098f41dbc
154 changed files with 2743 additions and 472 deletions

2
contrib/avro vendored

@ -1 +1 @@
Subproject commit 2fb8a8a6ec0eab9109b68abf3b4857e8c476b918
Subproject commit d43acc84d3d455b016f847d6666fbc3cd27f16a9

View File

@ -44,12 +44,14 @@ set (SRCS_IOSTREAMS
"${LIBRARY_DIR}/libs/iostreams/src/gzip.cpp"
"${LIBRARY_DIR}/libs/iostreams/src/mapped_file.cpp"
"${LIBRARY_DIR}/libs/iostreams/src/zlib.cpp"
"${LIBRARY_DIR}/libs/iostreams/src/zstd.cpp"
)
add_library (_boost_iostreams ${SRCS_IOSTREAMS})
add_library (boost::iostreams ALIAS _boost_iostreams)
target_include_directories (_boost_iostreams PRIVATE ${LIBRARY_DIR})
target_link_libraries (_boost_iostreams PRIVATE ch_contrib::zlib)
target_link_libraries (_boost_iostreams PRIVATE ch_contrib::zstd)
# program_options

View File

@ -34,9 +34,9 @@ if (OS_LINUX)
# avoid spurious latencies and additional work associated with
# MADV_DONTNEED. See
# https://github.com/ClickHouse/ClickHouse/issues/11121 for motivation.
set (JEMALLOC_CONFIG_MALLOC_CONF "percpu_arena:percpu,oversize_threshold:0,muzzy_decay_ms:5000,dirty_decay_ms:5000")
set (JEMALLOC_CONFIG_MALLOC_CONF "percpu_arena:percpu,oversize_threshold:0,muzzy_decay_ms:0,dirty_decay_ms:5000")
else()
set (JEMALLOC_CONFIG_MALLOC_CONF "oversize_threshold:0,muzzy_decay_ms:5000,dirty_decay_ms:5000")
set (JEMALLOC_CONFIG_MALLOC_CONF "oversize_threshold:0,muzzy_decay_ms:0,dirty_decay_ms:5000")
endif()
# CACHE variable is empty to allow changing defaults without the necessity
# to purge cache

View File

@ -0,0 +1,207 @@
---
slug: /en/operations/allocation-profiling
sidebar_label: "Allocation profiling"
title: "Allocation profiling"
---
import Tabs from '@theme/Tabs';
import TabItem from '@theme/TabItem';
# Allocation profiling
ClickHouse uses [jemalloc](https://github.com/jemalloc/jemalloc) as its global allocator that comes with some tools for allocation sampling and profiling.
To make allocation profiling more convenient, `SYSTEM` commands are provided along 4LW commands in Keeper.
## Sampling allocations and flushing heap profiles
If we want to sample and profile allocations in `jemalloc`, we need to start ClickHouse/Keeper with profiling enabled using environment variable `MALLOC_CONF`.
```sh
MALLOC_CONF=background_thread:true,prof:true
```
`jemalloc` will sample allocation and store the information internally.
We can tell `jemalloc` to flush current profile by running:
<Tabs groupId="binary">
<TabItem value="clickhouse" label="ClickHouse">
SYSTEM JEMALLOC FLUSH PROFILE
</TabItem>
<TabItem value="keeper" label="Keeper">
echo jmfp | nc localhost 9181
</TabItem>
</Tabs>
By default, heap profile file will be generated in `/tmp/jemalloc_clickhouse._pid_._seqnum_.heap` where `_pid_` is the PID of ClickHouse and `_seqnum_` is the global sequence number for the current heap profile.
For Keeper, the default file is `/tmp/jemalloc_keeper._pid_._seqnum_.heap` following the same rules.
A different location can be defined by appending the `MALLOC_CONF` environment variable with `prof_prefix` option.
For example, if we want to generate profiles in `/data` folder where the prefix for filename will be `my_current_profile` we can run ClickHouse/Keeper with following environment variable:
```sh
MALLOC_CONF=background_thread:true,prof:true,prof_prefix:/data/my_current_profile
```
Generated file will append to prefix PID and sequence number.
## Analyzing heap profiles
After we generated heap profiles, we need to analyze them.
For that, we need to use `jemalloc`'s tool called [jeprof](https://github.com/jemalloc/jemalloc/blob/dev/bin/jeprof.in) which can be installed in multiple ways:
- installing `jemalloc` using system's package manager
- cloning [jemalloc repo](https://github.com/jemalloc/jemalloc) and running autogen.sh from the root folder that will provide you with `jeprof` script inside the `bin` folder
:::note
`jeprof` uses `addr2line` to generate stacktraces which can be really slow.
If thats the case, we recommend installing an [alternative implementation](https://github.com/gimli-rs/addr2line) of the tool.
```
git clone https://github.com/gimli-rs/addr2line
cd addr2line
cargo b --examples -r
cp ./target/release/examples/addr2line path/to/current/addr2line
```
:::
There are many different formats to generate from the heap profile using `jeprof`.
We recommend to run `jeprof --help` to check usage and many different options the tool provides.
In general, `jeprof` command will look like this:
```sh
jeprof path/to/binary path/to/heap/profile --output_format [ > output_file]
```
If we want to compare which allocations happened between 2 profiles we can set the base argument:
```sh
jeprof path/to/binary --base path/to/first/heap/profile path/to/second/heap/profile --output_format [ > output_file]
```
For example:
- if we want to generate a text file with each procedure written per line:
```sh
jeprof path/to/binary path/to/heap/profile --text > result.txt
```
- if we want to generate a PDF file with call-graph:
```sh
jeprof path/to/binary path/to/heap/profile --pdf > result.pdf
```
### Generating flame graph
`jeprof` allows us to generate collapsed stacks for building flame graphs.
We need to use `--collapsed` argument:
```sh
jeprof path/to/binary path/to/heap/profile --collapsed > result.collapsed
```
After that, we can use many different tools to visualize collapsed stacks.
Most popular would be [FlameGraph](https://github.com/brendangregg/FlameGraph) which contains a script called `flamegraph.pl`:
```sh
cat result.collapsed | /path/to/FlameGraph/flamegraph.pl --color=mem --title="Allocation Flame Graph" --width 2400 > result.svg
```
Another interesting tool is [speedscope](https://www.speedscope.app/) that allows you to analyze collected stacks in a more interactive way.
## Controlling allocation profiler during runtime
If ClickHouse/Keeper were started with enabled profiler, they support additional commands for disabling/enabling allocation profiling during runtime.
Using those commands, it's easier to profile only specific intervals.
Disable profiler:
<Tabs groupId="binary">
<TabItem value="clickhouse" label="ClickHouse">
SYSTEM JEMALLOC DISABLE PROFILE
</TabItem>
<TabItem value="keeper" label="Keeper">
echo jmdp | nc localhost 9181
</TabItem>
</Tabs>
Enable profiler:
<Tabs groupId="binary">
<TabItem value="clickhouse" label="ClickHouse">
SYSTEM JEMALLOC ENABLE PROFILE
</TabItem>
<TabItem value="keeper" label="Keeper">
echo jmep | nc localhost 9181
</TabItem>
</Tabs>
It's also possible to control the initial state of the profiler by setting `prof_active` option which is enabled by default.
For example, if we don't want to sample allocations during startup but only after we enable the profiler, we can start ClickHouse/Keeper with following environment variable:
```sh
MALLOC_CONF=background_thread:true,prof:true,prof_active:false
```
and enable profiler at a later point.
## Additional options for profiler
`jemalloc` has many different options available related to profiler which can be controlled by modifying `MALLOC_CONF` environment variable.
For example, interval between allocation samples can be controlled with `lg_prof_sample`.
If you want to dump heap profile every N bytes you can enable it using `lg_prof_interval`.
We recommend to check `jemalloc`s [reference page](https://jemalloc.net/jemalloc.3.html) for such options.
## Other resources
ClickHouse/Keeper expose `jemalloc` related metrics in many different ways.
:::warning Warning
It's important to be aware that none of these metrics are synchronized with each other and values may drift.
:::
### System table `asynchronous_metrics`
```sql
SELECT *
FROM system.asynchronous_metrics
WHERE metric ILIKE '%jemalloc%'
FORMAT Vertical
```
[Reference](/en/operations/system-tables/asynchronous_metrics)
### System table `jemalloc_bins`
Contains information about memory allocations done via jemalloc allocator in different size classes (bins) aggregated from all arenas.
[Reference](/en/operations/system-tables/jemalloc_bins)
### Prometheus
All `jemalloc` related metrics from `asynchronous_metrics` are also exposed using Prometheus endpoint in both ClickHouse and Keeper.
[Reference](/en/operations/server-configuration-parameters/settings#prometheus)
### `jmst` 4LW command in Keeper
Keeper supports `jmst` 4LW command which returns [basic allocator statistics](https://github.com/jemalloc/jemalloc/wiki/Use-Case%3A-Basic-Allocator-Statistics).
Example:
```sh
echo jmst | nc localhost 9181
```

View File

@ -65,6 +65,20 @@ With Cluster Discovery, rather than defining each node explicitly, you simply sp
<cluster_name>
<discovery>
<path>/clickhouse/discovery/cluster_name</path>
<!-- # Optional configuration parameters: -->
<!-- ## Authentication credentials to access all other nodes in cluster: -->
<!-- <user>user1</user> -->
<!-- <password>pass123</password> -->
<!-- ### Alternatively to password, interserver secret may be used: -->
<!-- <secret>secret123</secret> -->
<!-- ## Shard for current node (see below): -->
<!-- <shard>1</shard> -->
<!-- ## Observer mode (see below): -->
<!-- <observer/> -->
</discovery>
</cluster_name>
</remote_servers>

View File

@ -1597,7 +1597,13 @@ Result:
Use ANSI escape sequences to paint colors in Pretty formats.
Enabled by default.
possible values:
- `0` — Disabled. Pretty formats do not use ANSI escape sequences.
- `1` — Enabled. Pretty formats will use ANSI escape sequences except for `NoEscapes` formats.
- `auto` - Enabled if `stdout` is a terminal except for `NoEscapes` formats.
Default value is `auto`.
### output_format_pretty_grid_charset {#output_format_pretty_grid_charset}

View File

@ -2796,6 +2796,17 @@ SELECT TOP 3 name, value FROM system.settings;
3. │ max_block_size │ 65505 │
└─────────────────────────┴─────────┘
```
### output_format_pretty_color {#output_format_pretty_color}
Включает/выключает управляющие последовательности ANSI в форматах Pretty.
Возможные значения:
- `0` — выключена. Не исползует ANSI последовательности в форматах Pretty.
- `1` — включена. Исползует ANSI последовательности с исключением форматов `NoEscapes`.
- `auto` - включена если `stdout` является терминалом с исключением форматов `NoEscapes`.
Значение по умолчанию: `auto`
## system_events_show_zero_values {#system_events_show_zero_values}

View File

@ -1467,6 +1467,8 @@ try
global_context->reloadAuxiliaryZooKeepersConfigIfChanged(config);
global_context->reloadQueryMaskingRulesIfChanged(config);
std::lock_guard lock(servers_lock);
updateServers(*config, server_pool, async_metrics, servers, servers_to_start_before_tables);
}

View File

@ -3140,6 +3140,64 @@ QueryTreeNodePtr QueryAnalyzer::tryResolveIdentifierFromJoin(const IdentifierLoo
}
}
auto check_nested_column_not_in_using = [&join_using_column_name_to_column_node, &identifier_lookup](const QueryTreeNodePtr & node)
{
/** tldr: When an identifier is resolved into the function `nested` or `getSubcolumn`, and
* some column in its argument is in the USING list and its type has to be updated, we throw an error to avoid overcomplication.
*
* Identifiers can be resolved into functions in case of nested or subcolumns.
* For example `t.t.t` can be resolved into `getSubcolumn(t, 't.t')` function in case of `t` is `Tuple`.
* So, `t` in USING list is resolved from JOIN itself and has supertype of columns from left and right table.
* But `t` in `getSubcolumn` argument is still resolved from table and we need to update its type.
*
* Example:
*
* SELECT t.t FROM (
* SELECT ((1, 's'), 's') :: Tuple(t Tuple(t UInt32, s1 String), s1 String) as t
* ) AS a FULL JOIN (
* SELECT ((1, 's'), 's') :: Tuple(t Tuple(t Int32, s2 String), s2 String) as t
* ) AS b USING t;
*
* Result type of `t` is `Tuple(Tuple(Int64, String), String)` (different type and no names for subcolumns),
* so it may be tricky to have a correct type for `t.t` that is resolved into getSubcolumn(t, 't').
*
* It can be more complicated in case of Nested subcolumns, in that case in query:
* SELECT t FROM ... JOIN ... USING (t.t)
* Here, `t` is resolved into function `nested(['t', 's'], t.t, t.s) so, `t.t` should be from JOIN and `t.s` should be from table.
*
* Updating type accordingly is pretty complicated, so just forbid such cases.
*
* While it still may work for storages that support selecting subcolumns directly without `getSubcolumn` function:
* SELECT t, t.t, toTypeName(t), toTypeName(t.t) FROM t1 AS a FULL JOIN t2 AS b USING t.t;
* We just support it as a best-effort: `t` will have original type from table, but `t.t` will have super-type from JOIN.
* Probably it's good to prohibit such cases as well, but it's not clear how to check it in general case.
*/
if (node->getNodeType() != QueryTreeNodeType::FUNCTION)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected node type {}, expected function node", node->getNodeType());
const auto & function_argument_nodes = node->as<FunctionNode &>().getArguments().getNodes();
for (const auto & argument_node : function_argument_nodes)
{
if (argument_node->getNodeType() == QueryTreeNodeType::COLUMN)
{
const auto & column_name = argument_node->as<ColumnNode &>().getColumnName();
if (join_using_column_name_to_column_node.contains(column_name))
throw Exception(ErrorCodes::AMBIGUOUS_IDENTIFIER,
"Cannot select subcolumn for identifier '{}' while joining using column '{}'",
identifier_lookup.identifier, column_name);
}
else if (argument_node->getNodeType() == QueryTreeNodeType::CONSTANT)
{
continue;
}
else
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected node type {} for argument node in {}",
argument_node->getNodeType(), node->formatASTForErrorMessage());
}
}
};
std::optional<JoinTableSide> resolved_side;
QueryTreeNodePtr resolved_identifier;
@ -3173,12 +3231,23 @@ QueryTreeNodePtr QueryAnalyzer::tryResolveIdentifierFromJoin(const IdentifierLoo
if (left_resolved_identifier && right_resolved_identifier)
{
auto & left_resolved_column = left_resolved_identifier->as<ColumnNode &>();
auto & right_resolved_column = right_resolved_identifier->as<ColumnNode &>();
auto using_column_node_it = join_using_column_name_to_column_node.end();
if (left_resolved_identifier->getNodeType() == QueryTreeNodeType::COLUMN && right_resolved_identifier->getNodeType() == QueryTreeNodeType::COLUMN)
{
auto & left_resolved_column = left_resolved_identifier->as<ColumnNode &>();
auto & right_resolved_column = right_resolved_identifier->as<ColumnNode &>();
if (left_resolved_column.getColumnName() == right_resolved_column.getColumnName())
using_column_node_it = join_using_column_name_to_column_node.find(left_resolved_column.getColumnName());
}
else
{
if (left_resolved_identifier->getNodeType() != QueryTreeNodeType::COLUMN)
check_nested_column_not_in_using(left_resolved_identifier);
if (right_resolved_identifier->getNodeType() != QueryTreeNodeType::COLUMN)
check_nested_column_not_in_using(right_resolved_identifier);
}
auto using_column_node_it = join_using_column_name_to_column_node.find(left_resolved_column.getColumnName());
if (using_column_node_it != join_using_column_name_to_column_node.end()
&& left_resolved_column.getColumnName() == right_resolved_column.getColumnName())
if (using_column_node_it != join_using_column_name_to_column_node.end())
{
JoinTableSide using_column_inner_column_table_side = isRight(join_kind) ? JoinTableSide::Right : JoinTableSide::Left;
auto & using_column_node = using_column_node_it->second->as<ColumnNode &>();
@ -3253,39 +3322,45 @@ QueryTreeNodePtr QueryAnalyzer::tryResolveIdentifierFromJoin(const IdentifierLoo
else if (left_resolved_identifier)
{
resolved_side = JoinTableSide::Left;
auto & left_resolved_column = left_resolved_identifier->as<ColumnNode &>();
resolved_identifier = left_resolved_identifier;
auto using_column_node_it = join_using_column_name_to_column_node.find(left_resolved_column.getColumnName());
if (using_column_node_it != join_using_column_name_to_column_node.end() &&
!using_column_node_it->second->getColumnType()->equals(*left_resolved_column.getColumnType()))
if (left_resolved_identifier->getNodeType() != QueryTreeNodeType::COLUMN)
{
auto left_resolved_column_clone = std::static_pointer_cast<ColumnNode>(left_resolved_column.clone());
left_resolved_column_clone->setColumnType(using_column_node_it->second->getColumnType());
resolved_identifier = std::move(left_resolved_column_clone);
check_nested_column_not_in_using(left_resolved_identifier);
}
else
{
resolved_identifier = left_resolved_identifier;
auto & left_resolved_column = left_resolved_identifier->as<ColumnNode &>();
auto using_column_node_it = join_using_column_name_to_column_node.find(left_resolved_column.getColumnName());
if (using_column_node_it != join_using_column_name_to_column_node.end() &&
!using_column_node_it->second->getColumnType()->equals(*left_resolved_column.getColumnType()))
{
auto left_resolved_column_clone = std::static_pointer_cast<ColumnNode>(left_resolved_column.clone());
left_resolved_column_clone->setColumnType(using_column_node_it->second->getColumnType());
resolved_identifier = std::move(left_resolved_column_clone);
}
}
}
else if (right_resolved_identifier)
{
resolved_side = JoinTableSide::Right;
auto & right_resolved_column = right_resolved_identifier->as<ColumnNode &>();
resolved_identifier = right_resolved_identifier;
auto using_column_node_it = join_using_column_name_to_column_node.find(right_resolved_column.getColumnName());
if (using_column_node_it != join_using_column_name_to_column_node.end() &&
!using_column_node_it->second->getColumnType()->equals(*right_resolved_column.getColumnType()))
if (right_resolved_identifier->getNodeType() != QueryTreeNodeType::COLUMN)
{
auto right_resolved_column_clone = std::static_pointer_cast<ColumnNode>(right_resolved_column.clone());
right_resolved_column_clone->setColumnType(using_column_node_it->second->getColumnType());
resolved_identifier = std::move(right_resolved_column_clone);
check_nested_column_not_in_using(right_resolved_identifier);
}
else
{
resolved_identifier = right_resolved_identifier;
auto & right_resolved_column = right_resolved_identifier->as<ColumnNode &>();
auto using_column_node_it = join_using_column_name_to_column_node.find(right_resolved_column.getColumnName());
if (using_column_node_it != join_using_column_name_to_column_node.end() &&
!using_column_node_it->second->getColumnType()->equals(*right_resolved_column.getColumnType()))
{
auto right_resolved_column_clone = std::static_pointer_cast<ColumnNode>(right_resolved_column.clone());
right_resolved_column_clone->setColumnType(using_column_node_it->second->getColumnType());
resolved_identifier = std::move(right_resolved_column_clone);
}
}
}

View File

@ -36,7 +36,7 @@ static constexpr size_t PRINT_MESSAGE_EACH_N_SECONDS = 5;
void logAboutProgress(Poco::Logger * log, size_t processed, size_t total, AtomicStopwatch & watch)
{
if (processed % PRINT_MESSAGE_EACH_N_OBJECTS == 0 || watch.compareAndRestart(PRINT_MESSAGE_EACH_N_SECONDS))
if (total && (processed % PRINT_MESSAGE_EACH_N_OBJECTS == 0 || watch.compareAndRestart(PRINT_MESSAGE_EACH_N_SECONDS)))
{
LOG_INFO(log, "Processed: {}%", processed * 100.0 / total);
watch.restart();

View File

@ -69,14 +69,14 @@ void handle_error_code([[maybe_unused]] const std::string & msg, int code, bool
Exception::MessageMasked::MessageMasked(const std::string & msg_)
: msg(msg_)
{
if (auto * masker = SensitiveDataMasker::getInstance())
if (auto masker = SensitiveDataMasker::getInstance())
masker->wipeSensitiveData(msg);
}
Exception::MessageMasked::MessageMasked(std::string && msg_)
: msg(std::move(msg_))
{
if (auto * masker = SensitiveDataMasker::getInstance())
if (auto masker = SensitiveDataMasker::getInstance())
masker->wipeSensitiveData(msg);
}

View File

@ -25,6 +25,9 @@ public:
/// Is the user allowed to write a new entry into the cache?
virtual bool approveWrite(const UUID & user_id, size_t entry_size_in_bytes) const = 0;
/// Clears the policy contents
virtual void clear() = 0;
virtual ~ICachePolicyUserQuota() = default;
};
@ -38,6 +41,7 @@ public:
void increaseActual(const UUID & /*user_id*/, size_t /*entry_size_in_bytes*/) override {}
void decreaseActual(const UUID & /*user_id*/, size_t /*entry_size_in_bytes*/) override {}
bool approveWrite(const UUID & /*user_id*/, size_t /*entry_size_in_bytes*/) const override { return true; }
void clear() override {}
};

View File

@ -85,20 +85,25 @@ public:
SensitiveDataMasker::~SensitiveDataMasker() = default;
std::unique_ptr<SensitiveDataMasker> SensitiveDataMasker::sensitive_data_masker = nullptr;
SensitiveDataMasker::MaskerMultiVersion SensitiveDataMasker::sensitive_data_masker{};
void SensitiveDataMasker::setInstance(std::unique_ptr<SensitiveDataMasker> sensitive_data_masker_)
void SensitiveDataMasker::setInstance(std::unique_ptr<SensitiveDataMasker>&& sensitive_data_masker_)
{
if (!sensitive_data_masker_)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Logical error: the 'sensitive_data_masker' is not set");
if (sensitive_data_masker_->rulesCount() > 0)
{
sensitive_data_masker = std::move(sensitive_data_masker_);
sensitive_data_masker.set(std::move(sensitive_data_masker_));
}
else
{
sensitive_data_masker.set(nullptr);
}
}
SensitiveDataMasker * SensitiveDataMasker::getInstance()
SensitiveDataMasker::MaskerMultiVersion::Version SensitiveDataMasker::getInstance()
{
return sensitive_data_masker.get();
}
@ -197,7 +202,7 @@ std::string wipeSensitiveDataAndCutToLength(const std::string & str, size_t max_
{
std::string res = str;
if (auto * masker = SensitiveDataMasker::getInstance())
if (auto masker = SensitiveDataMasker::getInstance())
masker->wipeSensitiveData(res);
size_t length = res.length();

View File

@ -2,7 +2,7 @@
#include <memory>
#include <vector>
#include <cstdint>
#include "Common/MultiVersion.h"
namespace Poco
{
@ -45,7 +45,8 @@ class SensitiveDataMasker
private:
class MaskingRule;
std::vector<std::unique_ptr<MaskingRule>> all_masking_rules;
static std::unique_ptr<SensitiveDataMasker> sensitive_data_masker;
using MaskerMultiVersion = MultiVersion<SensitiveDataMasker>;
static MaskerMultiVersion sensitive_data_masker;
public:
SensitiveDataMasker(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix);
@ -56,8 +57,8 @@ public:
/// setInstance is not thread-safe and should be called once in single-thread mode.
/// https://github.com/ClickHouse/ClickHouse/pull/6810#discussion_r321183367
static void setInstance(std::unique_ptr<SensitiveDataMasker> sensitive_data_masker_);
static SensitiveDataMasker * getInstance();
static void setInstance(std::unique_ptr<SensitiveDataMasker>&& sensitive_data_masker_);
static MaskerMultiVersion::Version getInstance();
/// Used in tests.
void addMaskingRule(const std::string & name, const std::string & regexp_string, const std::string & replacement_string);

View File

@ -32,6 +32,10 @@ std::atomic<bool> show_addresses = true;
bool shouldShowAddress(const void * addr)
{
/// Likely inline frame
if (!addr)
return false;
/// If the address is less than 4096, most likely it is a nullptr dereference with offset,
/// and showing this offset is secure nevertheless.
/// NOTE: 4096 is the page size on x86 and it can be different on other systems,
@ -203,20 +207,24 @@ static void * getCallerAddress(const ucontext_t & context)
#endif
}
// FIXME: looks like this is used only for Sentry but duplicates the whole algo, maybe replace?
void StackTrace::symbolize(
const StackTrace::FramePointers & frame_pointers, [[maybe_unused]] size_t offset, size_t size, StackTrace::Frames & frames)
void StackTrace::forEachFrame(
const StackTrace::FramePointers & frame_pointers,
size_t offset,
size_t size,
std::function<void(const Frame &)> callback,
bool fatal)
{
#if defined(__ELF__) && !defined(OS_FREEBSD)
const DB::SymbolIndex & symbol_index = DB::SymbolIndex::instance();
std::unordered_map<std::string, DB::Dwarf> dwarfs;
for (size_t i = 0; i < offset; ++i)
frames[i].virtual_addr = frame_pointers[i];
using enum DB::Dwarf::LocationInfoMode;
const auto mode = fatal ? FULL_WITH_INLINE : FAST;
for (size_t i = offset; i < size; ++i)
{
StackTrace::Frame & current_frame = frames[i];
StackTrace::Frame current_frame;
std::vector<DB::Dwarf::SymbolizedFrame> inline_frames;
current_frame.virtual_addr = frame_pointers[i];
const auto * object = symbol_index.findObject(current_frame.virtual_addr);
uintptr_t virtual_offset = object ? uintptr_t(object->address_begin) : 0;
@ -230,26 +238,41 @@ void StackTrace::symbolize(
auto dwarf_it = dwarfs.try_emplace(object->name, object->elf).first;
DB::Dwarf::LocationInfo location;
std::vector<DB::Dwarf::SymbolizedFrame> inline_frames;
if (dwarf_it->second.findAddress(
uintptr_t(current_frame.physical_addr), location, DB::Dwarf::LocationInfoMode::FAST, inline_frames))
uintptr_t(current_frame.physical_addr), location, mode, inline_frames))
{
current_frame.file = location.file.toString();
current_frame.line = location.line;
}
}
}
else
current_frame.object = "?";
if (const auto * symbol = symbol_index.findSymbol(current_frame.virtual_addr))
current_frame.symbol = demangle(symbol->name);
else
current_frame.symbol = "?";
for (const auto & frame : inline_frames)
{
StackTrace::Frame current_inline_frame;
const String file_for_inline_frame = frame.location.file.toString();
current_inline_frame.file = "inlined from " + file_for_inline_frame;
current_inline_frame.line = frame.location.line;
current_inline_frame.symbol = frame.name;
callback(current_inline_frame);
}
callback(current_frame);
}
#else
for (size_t i = 0; i < size; ++i)
frames[i].virtual_addr = frame_pointers[i];
UNUSED(fatal);
for (size_t i = offset; i < size; ++i)
{
StackTrace::Frame current_frame;
current_frame.virtual_addr = frame_pointers[i];
callback(current_frame);
}
#endif
}
@ -349,72 +372,52 @@ toStringEveryLineImpl([[maybe_unused]] bool fatal, const StackTraceRefTriple & s
if (stack_trace.size == 0)
return callback("<Empty trace>");
size_t frame_index = stack_trace.offset;
#if defined(__ELF__) && !defined(OS_FREEBSD)
using enum DB::Dwarf::LocationInfoMode;
const auto mode = fatal ? FULL_WITH_INLINE : FAST;
const DB::SymbolIndex & symbol_index = DB::SymbolIndex::instance();
std::unordered_map<String, DB::Dwarf> dwarfs;
for (size_t i = stack_trace.offset; i < stack_trace.size; ++i)
size_t inline_frame_index = 0;
auto callback_wrapper = [&](const StackTrace::Frame & frame)
{
std::vector<DB::Dwarf::SymbolizedFrame> inline_frames;
const void * virtual_addr = stack_trace.pointers[i];
const auto * object = symbol_index.findObject(virtual_addr);
uintptr_t virtual_offset = object ? uintptr_t(object->address_begin) : 0;
const void * physical_addr = reinterpret_cast<const void *>(uintptr_t(virtual_addr) - virtual_offset);
DB::WriteBufferFromOwnString out;
out << i << ". ";
String file;
if (std::error_code ec; object && std::filesystem::exists(object->name, ec) && !ec)
/// Inline frame
if (!frame.virtual_addr)
{
auto dwarf_it = dwarfs.try_emplace(object->name, object->elf).first;
DB::Dwarf::LocationInfo location;
if (dwarf_it->second.findAddress(uintptr_t(physical_addr), location, mode, inline_frames))
{
file = location.file.toString();
out << file << ":" << location.line << ": ";
}
out << frame_index << "." << inline_frame_index++ << ". ";
}
else
{
out << frame_index++ << ". ";
inline_frame_index = 0;
}
if (const auto * const symbol = symbol_index.findSymbol(virtual_addr))
out << demangleAndCollapseNames(file, symbol->name);
if (frame.file.has_value() && frame.line.has_value())
out << *frame.file << ':' << *frame.line << ": ";
if (frame.symbol.has_value() && frame.file.has_value())
out << demangleAndCollapseNames(*frame.file, frame.symbol->data());
else
out << "?";
if (shouldShowAddress(physical_addr))
if (shouldShowAddress(frame.physical_addr))
{
out << " @ ";
DB::writePointerHex(physical_addr, out);
DB::writePointerHex(frame.physical_addr, out);
}
out << " in " << (object ? object->name : "?");
for (size_t j = 0; j < inline_frames.size(); ++j)
{
const auto & frame = inline_frames[j];
const String file_for_inline_frame = frame.location.file.toString();
callback(fmt::format(
"{}.{}. inlined from {}:{}: {}",
i,
j + 1,
file_for_inline_frame,
frame.location.line,
demangleAndCollapseNames(file_for_inline_frame, frame.name)));
}
if (frame.object.has_value())
out << " in " << *frame.object;
callback(out.str());
}
};
#else
for (size_t i = stack_trace.offset; i < stack_trace.size; ++i)
if (const void * const addr = stack_trace.pointers[i]; shouldShowAddress(addr))
callback(fmt::format("{}. {}", i, addr));
auto callback_wrapper = [&](const StackTrace::Frame & frame)
{
if (frame.virtual_addr && shouldShowAddress(frame.virtual_addr))
callback(fmt::format("{}. {}", frame_index++, frame.virtual_addr));
};
#endif
StackTrace::forEachFrame(stack_trace.pointers, stack_trace.offset, stack_trace.size, callback_wrapper, fatal);
}
void StackTrace::toStringEveryLine(std::function<void(std::string_view)> callback) const

View File

@ -62,7 +62,14 @@ public:
static std::string toString(void ** frame_pointers, size_t offset, size_t size);
static void dropCache();
static void symbolize(const FramePointers & frame_pointers, size_t offset, size_t size, StackTrace::Frames & frames);
/// @param fatal - if true, will process inline frames (slower)
static void forEachFrame(
const FramePointers & frame_pointers,
size_t offset,
size_t size,
std::function<void(const Frame &)> callback,
bool fatal);
void toStringEveryLine(std::function<void(std::string_view)> callback) const;
static void toStringEveryLine(const FramePointers & frame_pointers, std::function<void(std::string_view)> callback);

View File

@ -38,12 +38,12 @@ public:
bool approveWrite(const UUID & user_id, size_t entry_size_in_bytes) const override
{
auto it_actual = actual.find(user_id);
Resources actual_for_user{.size_in_bytes = 0, .num_items = 0}; /// assume zero actual resource consumption is user isn't found
Resources actual_for_user{.size_in_bytes = 0, .num_items = 0}; /// if no user is found, the default is no resource consumption
if (it_actual != actual.end())
actual_for_user = it_actual->second;
auto it_quota = quotas.find(user_id);
Resources quota_for_user{.size_in_bytes = std::numeric_limits<size_t>::max(), .num_items = std::numeric_limits<size_t>::max()}; /// assume no threshold if no quota is found
Resources quota_for_user{.size_in_bytes = std::numeric_limits<size_t>::max(), .num_items = std::numeric_limits<size_t>::max()}; /// if no user is found, the default is no threshold
if (it_quota != quotas.end())
quota_for_user = it_quota->second;
@ -54,16 +54,21 @@ public:
quota_for_user.num_items = std::numeric_limits<UInt64>::max();
/// Check size quota
if (actual_for_user.size_in_bytes + entry_size_in_bytes >= quota_for_user.size_in_bytes)
if (actual_for_user.size_in_bytes + entry_size_in_bytes > quota_for_user.size_in_bytes)
return false;
/// Check items quota
if (quota_for_user.num_items + 1 >= quota_for_user.num_items)
if (actual_for_user.num_items + 1 > quota_for_user.num_items)
return false;
return true;
}
void clear() override
{
actual.clear();
}
struct Resources
{
size_t size_in_bytes = 0;
@ -125,6 +130,7 @@ public:
void clear() override
{
cache.clear();
Base::user_quotas->clear();
}
void remove(const Key & key) override

View File

@ -22,6 +22,7 @@ struct Keeper4LWInfo
bool is_standalone;
bool has_leader;
bool is_exceeding_mem_soft_limit;
uint64_t alive_connections_count;
uint64_t outstanding_requests_count;

View File

@ -28,8 +28,7 @@ void updateKeeperInformation(KeeperDispatcher & keeper_dispatcher, AsynchronousM
size_t zxid = 0;
size_t session_with_watches = 0;
size_t paths_watched = 0;
//size_t snapshot_dir_size = 0;
//size_t log_dir_size = 0;
size_t is_exceeding_mem_soft_limit = 0;
if (keeper_dispatcher.isServerActive())
{
@ -38,6 +37,7 @@ void updateKeeperInformation(KeeperDispatcher & keeper_dispatcher, AsynchronousM
is_leader = static_cast<size_t>(keeper_info.is_leader);
is_observer = static_cast<size_t>(keeper_info.is_observer);
is_follower = static_cast<size_t>(keeper_info.is_follower);
is_exceeding_mem_soft_limit = static_cast<size_t>(keeper_info.is_exceeding_mem_soft_limit);
zxid = keeper_info.last_zxid;
const auto & state_machine = keeper_dispatcher.getStateMachine();
@ -68,6 +68,7 @@ void updateKeeperInformation(KeeperDispatcher & keeper_dispatcher, AsynchronousM
new_values["KeeperIsFollower"] = { is_follower, "1 if ClickHouse Keeper is a follower, 0 otherwise." };
new_values["KeeperIsObserver"] = { is_observer, "1 if ClickHouse Keeper is an observer, 0 otherwise." };
new_values["KeeperIsStandalone"] = { is_standalone, "1 if ClickHouse Keeper is in a standalone mode, 0 otherwise." };
new_values["KeeperIsExceedingMemorySoftLimitHit"] = { is_exceeding_mem_soft_limit, "1 if ClickHouse Keeper is exceeding the memory soft limit, 0 otherwise." };
new_values["KeeperZnodeCount"] = { znode_count, "The number of nodes (data entries) in ClickHouse Keeper." };
new_values["KeeperWatchCount"] = { watch_count, "The number of watches in ClickHouse Keeper." };
@ -85,8 +86,6 @@ void updateKeeperInformation(KeeperDispatcher & keeper_dispatcher, AsynchronousM
new_values["KeeperZxid"] = { zxid, "The current transaction id number (zxid) in ClickHouse Keeper." };
new_values["KeeperSessionWithWatches"] = { session_with_watches, "The number of client sessions of ClickHouse Keeper having watches." };
new_values["KeeperPathsWatched"] = { paths_watched, "The number of different paths watched by the clients of ClickHouse Keeper." };
//new_values["KeeperSnapshotDirSize"] = { snapshot_dir_size, "The size of the snapshots directory of ClickHouse Keeper, in bytes." };
//new_values["KeeperLogDirSize"] = { log_dir_size, "The size of the logs directory of ClickHouse Keeper, in bytes." };
auto keeper_log_info = keeper_dispatcher.getKeeperLogInfo();

View File

@ -132,7 +132,7 @@ void KeeperDispatcher::requestThread()
break;
Int64 mem_soft_limit = keeper_context->getKeeperMemorySoftLimit();
if (configuration_and_settings->standalone_keeper && mem_soft_limit > 0 && total_memory_tracker.get() >= mem_soft_limit && checkIfRequestIncreaseMem(request.request))
if (configuration_and_settings->standalone_keeper && isExceedingMemorySoftLimit() && checkIfRequestIncreaseMem(request.request))
{
LOG_TRACE(log, "Processing requests refused because of max_memory_usage_soft_limit {}, the total used memory is {}, request type is {}", mem_soft_limit, total_memory_tracker.get(), request.request->getOpNum());
addErrorResponses({request}, Coordination::Error::ZCONNECTIONLOSS);

View File

@ -177,6 +177,11 @@ public:
return server->isObserver();
}
bool isExceedingMemorySoftLimit() const
{
return server->isExceedingMemorySoftLimit();
}
uint64_t getLogDirSize() const;
uint64_t getSnapDirSize() const;

View File

@ -548,6 +548,12 @@ bool KeeperServer::isLeaderAlive() const
return raft_instance && raft_instance->is_leader_alive();
}
bool KeeperServer::isExceedingMemorySoftLimit() const
{
Int64 mem_soft_limit = keeper_context->getKeeperMemorySoftLimit();
return mem_soft_limit > 0 && total_memory_tracker.get() >= mem_soft_limit;
}
/// TODO test whether taking failed peer in count
uint64_t KeeperServer::getFollowerCount() const
{
@ -1075,6 +1081,7 @@ Keeper4LWInfo KeeperServer::getPartiallyFilled4LWInfo() const
result.follower_count = getFollowerCount();
result.synced_follower_count = getSyncedFollowerCount();
}
result.is_exceeding_mem_soft_limit = isExceedingMemorySoftLimit();
result.total_nodes_count = getKeeperStateMachine()->getNodesCount();
result.last_zxid = getKeeperStateMachine()->getLastProcessedZxid();
return result;

View File

@ -110,6 +110,8 @@ public:
bool isLeaderAlive() const;
bool isExceedingMemorySoftLimit() const;
Keeper4LWInfo getPartiallyFilled4LWInfo() const;
/// @return follower count if node is not leader return 0

View File

@ -810,7 +810,7 @@ class IColumn;
M(Bool, parallelize_output_from_storages, true, "Parallelize output for reading step from storage. It allows parallelizing query processing right after reading from storage if possible", 0) \
M(String, insert_deduplication_token, "", "If not empty, used for duplicate detection instead of data digest", 0) \
M(Bool, count_distinct_optimization, false, "Rewrite count distinct to subquery of group by", 0) \
M(Bool, throw_if_no_data_to_insert, true, "Enables or disables empty INSERTs, enabled by default", 0) \
M(Bool, throw_if_no_data_to_insert, true, "Allows or forbids empty INSERTs, enabled by default (throws an error on an empty insert)", 0) \
M(Bool, compatibility_ignore_auto_increment_in_create_table, false, "Ignore AUTO_INCREMENT keyword in column declaration if true, otherwise return error. It simplifies migration from MySQL", 0) \
M(Bool, multiple_joins_try_to_keep_original_names, false, "Do not add aliases to top level expression list on multiple joins rewrite", 0) \
M(Bool, optimize_sorting_by_input_stream_properties, true, "Optimize sorting by sorting properties of input stream", 0) \
@ -1048,7 +1048,7 @@ class IColumn;
M(UInt64, output_format_pretty_max_rows, 10000, "Rows limit for Pretty formats.", 0) \
M(UInt64, output_format_pretty_max_column_pad_width, 250, "Maximum width to pad all values in a column in Pretty formats.", 0) \
M(UInt64, output_format_pretty_max_value_width, 10000, "Maximum width of value to display in Pretty formats. If greater - it will be cut.", 0) \
M(Bool, output_format_pretty_color, true, "Use ANSI escape sequences to paint colors in Pretty formats", 0) \
M(UInt64Auto, output_format_pretty_color, "auto", "Use ANSI escape sequences in Pretty formats. 0 - disabled, 1 - enabled, 'auto' - enabled if a terminal.", 0) \
M(String, output_format_pretty_grid_charset, "UTF-8", "Charset for printing grid borders. Available charsets: ASCII, UTF-8 (default one).", 0) \
M(UInt64, output_format_parquet_row_group_size, 1000000, "Target row group size in rows.", 0) \
M(UInt64, output_format_parquet_row_group_size_bytes, 512 * 1024 * 1024, "Target row group size in bytes, before compression.", 0) \
@ -1061,7 +1061,7 @@ class IColumn;
M(Bool, output_format_parquet_parallel_encoding, true, "Do Parquet encoding in multiple threads. Requires output_format_parquet_use_custom_encoder.", 0) \
M(UInt64, output_format_parquet_data_page_size, 1024 * 1024, "Target page size in bytes, before compression.", 0) \
M(UInt64, output_format_parquet_batch_size, 1024, "Check page size every this many rows. Consider decreasing if you have columns with average values size above a few KBs.", 0) \
M(String, output_format_avro_codec, "", "Compression codec used for output. Possible values: 'null', 'deflate', 'snappy'.", 0) \
M(String, output_format_avro_codec, "", "Compression codec used for output. Possible values: 'null', 'deflate', 'snappy', 'zstd'.", 0) \
M(UInt64, output_format_avro_sync_interval, 16 * 1024, "Sync interval in bytes.", 0) \
M(String, output_format_avro_string_column_pattern, "", "For Avro format: regexp of String columns to select as AVRO string.", 0) \
M(UInt64, output_format_avro_rows_in_file, 1, "Max rows in a file (if permitted by storage)", 0) \

View File

@ -87,7 +87,8 @@ static std::map<ClickHouseVersion, SettingsChangesHistory::SettingsChanges> sett
{"23.12", {{"allow_suspicious_ttl_expressions", true, false, "It is a new setting, and in previous versions the behavior was equivalent to allowing."},
{"input_format_parquet_allow_missing_columns", false, true, "Allow missing columns in Parquet files by default"},
{"input_format_orc_allow_missing_columns", false, true, "Allow missing columns in ORC files by default"},
{"input_format_arrow_allow_missing_columns", false, true, "Allow missing columns in Arrow files by default"}}},
{"input_format_arrow_allow_missing_columns", false, true, "Allow missing columns in Arrow files by default"},
{"output_format_pretty_color", true, "auto", "Setting is changed to allow also for auto value, disabling ANSI escapes if output is not a tty"}}},
{"23.9", {{"optimize_group_by_constant_keys", false, true, "Optimize group by constant keys by default"},
{"input_format_json_try_infer_named_tuples_from_objects", false, true, "Try to infer named Tuples from JSON objects by default"},
{"input_format_json_read_numbers_as_strings", false, true, "Allow to read numbers as strings in JSON formats by default"},

View File

@ -169,11 +169,9 @@ void SentryWriter::onFault(int sig, const std::string & error_message, const Sta
};
StackTrace::Frames frames;
StackTrace::symbolize(stack_trace.getFramePointers(), offset, stack_size, frames);
for (ssize_t i = stack_size - 1; i >= offset; --i)
auto sentry_add_stack_trace = [&](const StackTrace::Frame & current_frame)
{
const StackTrace::Frame & current_frame = frames[i];
sentry_value_t sentry_frame = sentry_value_new_object();
UInt64 frame_ptr = reinterpret_cast<UInt64>(current_frame.virtual_addr);
@ -190,7 +188,9 @@ void SentryWriter::onFault(int sig, const std::string & error_message, const Sta
sentry_value_set_by_key(sentry_frame, "lineno", sentry_value_new_int32(static_cast<int32_t>(current_frame.line.value())));
sentry_value_append(sentry_frames, sentry_frame);
}
};
StackTrace::forEachFrame(stack_trace.getFramePointers(), offset, stack_size, sentry_add_stack_trace, /* fatal= */ true);
}
/// Prepare data for https://develop.sentry.dev/sdk/event-payloads/threads/

View File

@ -320,7 +320,7 @@ SerializationPtr DataTypeTuple::doGetDefaultSerialization() const
{
String elem_name = have_explicit_names ? names[i] : toString(i + 1);
auto serialization = elems[i]->getDefaultSerialization();
serializations[i] = std::make_shared<SerializationNamed>(serialization, elem_name);
serializations[i] = std::make_shared<SerializationNamed>(serialization, elem_name, SubstreamType::TupleElement);
}
return std::make_shared<SerializationTuple>(std::move(serializations), have_explicit_names);
@ -335,7 +335,7 @@ SerializationPtr DataTypeTuple::getSerialization(const SerializationInfo & info)
{
String elem_name = have_explicit_names ? names[i] : toString(i + 1);
auto serialization = elems[i]->getSerialization(*info_tuple.getElementInfo(i));
serializations[i] = std::make_shared<SerializationNamed>(serialization, elem_name);
serializations[i] = std::make_shared<SerializationNamed>(serialization, elem_name, SubstreamType::TupleElement);
}
return std::make_shared<SerializationTuple>(std::move(serializations), have_explicit_names);

View File

@ -16,6 +16,7 @@
#include <Columns/ColumnConst.h>
#include <Parsers/IAST.h>
#include <Storages/ColumnsDescription.h>
#include <boost/algorithm/string/case_conv.hpp>
@ -294,6 +295,12 @@ Names getAllNestedColumnsForTable(const Block & block, const std::string & table
return names;
}
bool isSubcolumnOfNested(const String & column_name, const ColumnsDescription & columns)
{
auto nested_subcolumn = columns.tryGetColumnOrSubcolumn(GetColumnsOptions::AllPhysical, column_name);
return nested_subcolumn && isNested(nested_subcolumn->getTypeInStorage()) && nested_subcolumn->isSubcolumn() && isArray(nested_subcolumn->type);
}
}
NestedColumnExtractHelper::NestedColumnExtractHelper(const Block & block_, bool case_insentive_)

View File

@ -7,6 +7,8 @@
namespace DB
{
class ColumnsDescription;
namespace Nested
{
std::string concatenateName(const std::string & nested_table_name, const std::string & nested_field_name);
@ -40,6 +42,9 @@ namespace Nested
/// Extract all column names that are nested for specifying table.
Names getAllNestedColumnsForTable(const Block & block, const std::string & table_name);
/// Returns true if @column_name is a subcolumn (of Array type) of any Nested column in @columns.
bool isSubcolumnOfNested(const String & column_name, const ColumnsDescription & columns);
}
/// Use this class to extract element columns from columns of nested type in a block, e.g. named Tuple.

View File

@ -49,11 +49,17 @@ ISerialization::Kind ISerialization::stringToKind(const String & str)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown serialization kind '{}'", str);
}
const std::set<SubstreamType> ISerialization::Substream::named_types
{
TupleElement,
NamedOffsets,
NamedNullMap,
};
String ISerialization::Substream::toString() const
{
if (type == TupleElement)
return fmt::format("TupleElement({}, escape_tuple_delimiter = {})",
tuple_element_name, escape_tuple_delimiter ? "true" : "false");
if (named_types.contains(type))
return fmt::format("{}({})", type, name_of_substream);
return String(magic_enum::enum_name(type));
}
@ -110,8 +116,10 @@ void ISerialization::serializeBinaryBulkWithMultipleStreams(
SerializeBinaryBulkSettings & settings,
SerializeBinaryBulkStatePtr & /* state */) const
{
settings.path.push_back(Substream::Regular);
if (WriteBuffer * stream = settings.getter(settings.path))
serializeBinaryBulk(column, *stream, offset, limit);
settings.path.pop_back();
}
void ISerialization::deserializeBinaryBulkWithMultipleStreams(
@ -121,6 +129,8 @@ void ISerialization::deserializeBinaryBulkWithMultipleStreams(
DeserializeBinaryBulkStatePtr & /* state */,
SubstreamsCache * cache) const
{
settings.path.push_back(Substream::Regular);
auto cached_column = getFromSubstreamsCache(cache, settings.path);
if (cached_column)
{
@ -133,6 +143,8 @@ void ISerialization::deserializeBinaryBulkWithMultipleStreams(
column = std::move(mutable_column);
addToSubstreamsCache(cache, settings.path, column);
}
settings.path.pop_back();
}
namespace
@ -161,16 +173,18 @@ String getNameForSubstreamPath(
stream_name += ".dict";
else if (it->type == Substream::SparseOffsets)
stream_name += ".sparse.idx";
else if (it->type == Substream::TupleElement)
else if (Substream::named_types.contains(it->type))
{
auto substream_name = "." + it->name_of_substream;
/// For compatibility reasons, we use %2E (escaped dot) instead of dot.
/// Because nested data may be represented not by Array of Tuple,
/// but by separate Array columns with names in a form of a.b,
/// and name is encoded as a whole.
if (escape_tuple_delimiter && it->escape_tuple_delimiter)
stream_name += escapeForFileName("." + it->tuple_element_name);
/// but by separate Array columns with names in a form of a.b,
/// and name is encoded as a whole.
if (it->type == Substream::TupleElement && escape_tuple_delimiter)
stream_name += escapeForFileName(substream_name);
else
stream_name += "." + it->tuple_element_name;
stream_name += substream_name;
}
}
@ -184,23 +198,31 @@ String ISerialization::getFileNameForStream(const NameAndTypePair & column, cons
return getFileNameForStream(column.getNameInStorage(), path);
}
bool isOffsetsOfNested(const ISerialization::SubstreamPath & path)
static bool isPossibleOffsetsOfNested(const ISerialization::SubstreamPath & path)
{
if (path.empty())
return false;
/// Arrays of Nested cannot be inside other types.
/// So it's ok to check only first element of path.
for (const auto & elem : path)
if (elem.type == ISerialization::Substream::ArrayElements)
return false;
/// Array offsets as a part of serialization of Array type.
if (path.size() == 1
&& path[0].type == ISerialization::Substream::ArraySizes)
return true;
return path.back().type == ISerialization::Substream::ArraySizes;
/// Array offsets as a separate subcolumn.
if (path.size() == 2
&& path[0].type == ISerialization::Substream::NamedOffsets
&& path[1].type == ISerialization::Substream::Regular
&& path[0].name_of_substream == "size0")
return true;
return false;
}
String ISerialization::getFileNameForStream(const String & name_in_storage, const SubstreamPath & path)
{
String stream_name;
auto nested_storage_name = Nested::extractTableName(name_in_storage);
if (name_in_storage != nested_storage_name && isOffsetsOfNested(path))
if (name_in_storage != nested_storage_name && isPossibleOffsetsOfNested(path))
stream_name = escapeForFileName(nested_storage_name);
else
stream_name = escapeForFileName(name_in_storage);

View File

@ -9,7 +9,7 @@
#include <boost/noncopyable.hpp>
#include <unordered_map>
#include <memory>
#include <variant>
#include <set>
namespace DB
{
@ -142,6 +142,8 @@ public:
NullMap,
TupleElement,
NamedOffsets,
NamedNullMap,
DictionaryKeys,
DictionaryIndexes,
@ -155,13 +157,13 @@ public:
Regular,
};
/// Types of substreams that can have arbitrary name.
static const std::set<Type> named_types;
Type type;
/// Index of tuple element, starting at 1 or name.
String tuple_element_name;
/// Do we need to escape a dot in filenames for tuple elements.
bool escape_tuple_delimiter = true;
/// Name of substream for type from 'named_types'.
String name_of_substream;
/// Data for current substream.
SubstreamData data;
@ -173,7 +175,6 @@ public:
mutable bool visited = false;
Substream(Type type_) : type(type_) {} /// NOLINT
String toString() const;
};
@ -393,6 +394,7 @@ protected:
using SerializationPtr = std::shared_ptr<const ISerialization>;
using Serializations = std::vector<SerializationPtr>;
using SerializationByName = std::unordered_map<String, SerializationPtr>;
using SubstreamType = ISerialization::Substream::Type;
template <typename State, typename StatePtr>
State * ISerialization::checkAndGetState(const StatePtr & state) const
@ -415,6 +417,4 @@ State * ISerialization::checkAndGetState(const StatePtr & state) const
return state_concrete;
}
bool isOffsetsOfNested(const ISerialization::SubstreamPath & path);
}

View File

@ -230,10 +230,10 @@ void SerializationArray::enumerateStreams(
const auto * column_array = data.column ? &assert_cast<const ColumnArray &>(*data.column) : nullptr;
auto offsets = column_array ? column_array->getOffsetsPtr() : nullptr;
auto offsets_serialization =
std::make_shared<SerializationNamed>(
std::make_shared<SerializationNumber<UInt64>>(),
"size" + std::to_string(getArrayLevel(settings.path)), false);
auto subcolumn_name = "size" + std::to_string(getArrayLevel(settings.path));
auto offsets_serialization = std::make_shared<SerializationNamed>(
std::make_shared<SerializationNumber<UInt64>>(),
subcolumn_name, SubstreamType::NamedOffsets);
auto offsets_column = offsets && !settings.position_independent_encoding
? arrayOffsetsToSizes(*offsets)

View File

@ -3,6 +3,23 @@
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
SerializationNamed::SerializationNamed(
const SerializationPtr & nested_,
const String & name_,
SubstreamType substream_type_)
: SerializationWrapper(nested_)
, name(name_)
, substream_type(substream_type_)
{
if (!ISerialization::Substream::named_types.contains(substream_type))
throw Exception(ErrorCodes::LOGICAL_ERROR, "SerializationNamed doesn't support substream type {}", substream_type);
}
void SerializationNamed::enumerateStreams(
EnumerateStreamsSettings & settings,
const StreamCallback & callback,
@ -10,7 +27,7 @@ void SerializationNamed::enumerateStreams(
{
addToPath(settings.path);
settings.path.back().data = data;
settings.path.back().creator = std::make_shared<SubcolumnCreator>(name, escape_delimiter);
settings.path.back().creator = std::make_shared<SubcolumnCreator>(name, substream_type);
nested_serialization->enumerateStreams(settings, callback, data);
settings.path.pop_back();
@ -70,9 +87,8 @@ void SerializationNamed::deserializeBinaryBulkWithMultipleStreams(
void SerializationNamed::addToPath(SubstreamPath & path) const
{
path.push_back(Substream::TupleElement);
path.back().tuple_element_name = name;
path.back().escape_tuple_delimiter = escape_delimiter;
path.push_back(substream_type);
path.back().name_of_substream = name;
}
}

View File

@ -1,5 +1,4 @@
#pragma once
#include <DataTypes/Serializations/SerializationWrapper.h>
namespace DB
@ -14,14 +13,10 @@ class SerializationNamed final : public SerializationWrapper
{
private:
String name;
bool escape_delimiter;
SubstreamType substream_type;
public:
SerializationNamed(const SerializationPtr & nested_, const String & name_, bool escape_delimiter_ = true)
: SerializationWrapper(nested_)
, name(name_), escape_delimiter(escape_delimiter_)
{
}
SerializationNamed(const SerializationPtr & nested_, const String & name_, SubstreamType substream_type_);
const String & getElementName() const { return name; }
@ -61,16 +56,18 @@ private:
struct SubcolumnCreator : public ISubcolumnCreator
{
const String name;
const bool escape_delimiter;
SubstreamType substream_type;
SubcolumnCreator(const String & name_, bool escape_delimiter_)
: name(name_), escape_delimiter(escape_delimiter_) {}
SubcolumnCreator(const String & name_, SubstreamType substream_type_)
: name(name_), substream_type(substream_type_)
{
}
DataTypePtr create(const DataTypePtr & prev) const override { return prev; }
ColumnPtr create(const ColumnPtr & prev) const override { return prev; }
SerializationPtr create(const SerializationPtr & prev) const override
{
return std::make_shared<SerializationNamed>(prev, name, escape_delimiter);
return std::make_shared<SerializationNamed>(prev, name, substream_type);
}
};

View File

@ -45,7 +45,9 @@ void SerializationNullable::enumerateStreams(
const auto * type_nullable = data.type ? &assert_cast<const DataTypeNullable &>(*data.type) : nullptr;
const auto * column_nullable = data.column ? &assert_cast<const ColumnNullable &>(*data.column) : nullptr;
auto null_map_serialization = std::make_shared<SerializationNamed>(std::make_shared<SerializationNumber<UInt8>>(), "null", false);
auto null_map_serialization = std::make_shared<SerializationNamed>(
std::make_shared<SerializationNumber<UInt8>>(),
"null", SubstreamType::NamedNullMap);
settings.path.push_back(Substream::NullMap);
auto null_map_data = SubstreamData(null_map_serialization)

View File

@ -267,7 +267,7 @@ bool SlabsPolygonIndex::find(const Point & point, size_t & id) const
Coord y = point.y();
/** Not in bounding box */
if (x < sorted_x[0] || x > sorted_x.back())
if (x < sorted_x.front() || x > sorted_x.back())
return false;
bool found = false;

View File

@ -157,6 +157,12 @@ public:
auto y_ratio = y * kSplit;
auto x_bin = static_cast<int>(x_ratio);
auto y_bin = static_cast<int>(y_ratio);
/// In case if we have a lot of values and argument is very close to max_x (max_y) so x_ratio (y_ratio) = 1.
if (x_bin == kSplit)
--x_bin;
/// => x_bin (y_bin) will be 4, which can lead to wrong vector access.
if (y_bin == kSplit)
--y_bin;
return children[y_bin + x_bin * kSplit]->find(x_ratio - x_bin, y_ratio - y_bin);
}

View File

@ -36,8 +36,7 @@ std::unique_ptr<ReadBufferFromFileBase> createReadBufferFromFileBase(
std::optional<size_t> file_size,
int flags,
char * existing_memory,
size_t alignment,
bool use_external_buffer)
size_t alignment)
{
if (file_size.has_value() && !*file_size)
return std::make_unique<ReadBufferFromEmptyFile>();
@ -149,8 +148,7 @@ std::unique_ptr<ReadBufferFromFileBase> createReadBufferFromFileBase(
existing_memory,
buffer_alignment,
file_size,
settings.local_throttler,
use_external_buffer);
settings.local_throttler);
}
else
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown read method");

View File

@ -21,6 +21,5 @@ std::unique_ptr<ReadBufferFromFileBase> createReadBufferFromFileBase(
std::optional<size_t> file_size = {},
int flags_ = -1,
char * existing_memory = nullptr,
size_t alignment = 0,
bool use_external_buffer = false);
size_t alignment = 0);
}

View File

@ -5,6 +5,7 @@
#include <Interpreters/Context.h>
#include <Interpreters/ProcessList.h>
#include <IO/SharedThreadPools.h>
#include <IO/WriteHelpers.h>
#include <Processors/Formats/IRowInputFormat.h>
#include <Processors/Formats/IRowOutputFormat.h>
#include <Processors/Formats/Impl/MySQLOutputFormat.h>
@ -451,6 +452,7 @@ OutputFormatPtr FormatFactory::getOutputFormatParallelIfPossible(
throw Exception(ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_OUTPUT, "Format {} is not suitable for output", name);
auto format_settings = _format_settings ? *_format_settings : getFormatSettings(context);
format_settings.is_writing_to_terminal = isWritingToTerminal(buf);
const Settings & settings = context->getSettingsRef();
@ -492,6 +494,7 @@ OutputFormatPtr FormatFactory::getOutputFormat(
auto format_settings = _format_settings ? *_format_settings : getFormatSettings(context);
format_settings.max_threads = context->getSettingsRef().max_threads;
format_settings.is_writing_to_terminal = format_settings.is_writing_to_terminal = isWritingToTerminal(buf);
/** TODO: Materialization is needed, because formats can use the functions `IDataType`,
* which only work with full columns.

View File

@ -4,7 +4,7 @@
#include <Core/Defines.h>
#include <base/types.h>
#include <base/unit.h>
#include <Core/SettingsFields.h>
namespace DB
{
@ -34,6 +34,7 @@ struct FormatSettings
bool null_as_default = true;
bool decimal_trailing_zeros = false;
bool defaults_for_omitted_fields = true;
bool is_writing_to_terminal = false;
bool seekable_read = true;
UInt64 max_rows_to_read_for_schema_inference = 25000;
@ -272,7 +273,7 @@ struct FormatSettings
UInt64 max_rows = 10000;
UInt64 max_column_pad_width = 250;
UInt64 max_value_width = 10000;
bool color = true;
SettingFieldUInt64Auto color{"auto"};
bool output_format_pretty_row_numbers = false;

View File

@ -1053,6 +1053,13 @@ struct JSONExtractTree
bool insertResultToColumn(IColumn & dest, const Element & element) override
{
if (dest.getDataType() == TypeIndex::LowCardinality)
{
/// We do not need to handle nullability in that case
/// because nested node handles LowCardinality columns and will call proper overload of `insertData`
return nested->insertResultToColumn(dest, element);
}
ColumnNullable & col_null = assert_cast<ColumnNullable &>(dest);
if (!nested->insertResultToColumn(col_null.getNestedColumn(), element))
return false;

View File

@ -67,10 +67,8 @@ public:
char * existing_memory = nullptr,
size_t alignment = 0,
std::optional<size_t> file_size_ = std::nullopt,
ThrottlerPtr throttler_ = {},
bool use_external_buffer_ = false)
: AsynchronousReadBufferFromFileDescriptor(
reader_, priority_, -1, buf_size, existing_memory, alignment, file_size_, throttler_, use_external_buffer_)
ThrottlerPtr throttler_ = {})
: AsynchronousReadBufferFromFileDescriptor(reader_, priority_, -1, buf_size, existing_memory, alignment, file_size_, throttler_)
, file_name(file_name_)
{
file = OpenedFileCache::instance().get(file_name, flags);

View File

@ -97,11 +97,7 @@ bool AsynchronousReadBufferFromFileDescriptor::nextImpl()
/// No pending request. Do synchronous read.
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::SynchronousReadWaitMicroseconds);
if (!use_external_buffer)
result = asyncReadInto(memory.data(), memory.size(), DEFAULT_PREFETCH_PRIORITY).get();
else
/// External buffer will be substituted in place of internal_buffer (see CachedOnDiskReadBufferFromFile)
result = asyncReadInto(internal_buffer.begin(), internal_buffer.size(), DEFAULT_PREFETCH_PRIORITY).get();
result = asyncReadInto(memory.data(), memory.size(), DEFAULT_PREFETCH_PRIORITY).get();
}
chassert(result.size >= result.offset);
@ -114,9 +110,8 @@ bool AsynchronousReadBufferFromFileDescriptor::nextImpl()
if (bytes_read)
{
/// Adjust the working buffer so that it ignores `offset` bytes.
if (!use_external_buffer)
internal_buffer = Buffer(memory.data(), memory.data() + memory.size());
working_buffer = Buffer(internal_buffer.begin() + result.offset, internal_buffer.begin() + result.size);
internal_buffer = Buffer(memory.data(), memory.data() + memory.size());
working_buffer = Buffer(memory.data() + result.offset, memory.data() + result.size);
pos = working_buffer.begin();
}
@ -142,15 +137,13 @@ AsynchronousReadBufferFromFileDescriptor::AsynchronousReadBufferFromFileDescript
char * existing_memory,
size_t alignment,
std::optional<size_t> file_size_,
ThrottlerPtr throttler_,
bool use_external_buffer_)
ThrottlerPtr throttler_)
: ReadBufferFromFileBase(buf_size, existing_memory, alignment, file_size_)
, reader(reader_)
, base_priority(priority_)
, required_alignment(alignment)
, fd(fd_)
, throttler(throttler_)
, use_external_buffer(use_external_buffer_)
{
if (required_alignment > buf_size)
throw Exception(
@ -228,7 +221,7 @@ off_t AsynchronousReadBufferFromFileDescriptor::seek(off_t offset, int whence)
file_offset_of_buffer_end = seek_pos;
bytes_to_ignore = new_pos - seek_pos;
if (bytes_to_ignore >= internal_buffer.size() && !(use_external_buffer && internal_buffer.empty()))
if (bytes_to_ignore >= internal_buffer.size())
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Logical error in AsynchronousReadBufferFromFileDescriptor, bytes_to_ignore ({}"
") >= internal_buffer.size() ({})", bytes_to_ignore, internal_buffer.size());

View File

@ -29,7 +29,6 @@ protected:
size_t bytes_to_ignore = 0; /// How many bytes should we ignore upon a new read request.
int fd;
ThrottlerPtr throttler;
bool use_external_buffer;
bool nextImpl() override;
@ -47,8 +46,7 @@ public:
char * existing_memory = nullptr,
size_t alignment = 0,
std::optional<size_t> file_size_ = std::nullopt,
ThrottlerPtr throttler_ = {},
bool use_external_buffer_ = false);
ThrottlerPtr throttler_ = {});
~AsynchronousReadBufferFromFileDescriptor() override;

View File

@ -35,7 +35,7 @@ URI::URI(const std::string & uri_)
/// Case when bucket name represented in domain name of S3 URL.
/// E.g. (https://bucket-name.s3.Region.amazonaws.com/key)
/// https://docs.aws.amazon.com/AmazonS3/latest/dev/VirtualHosting.html#virtual-hosted-style-access
static const RE2 virtual_hosted_style_pattern(R"((.+)\.(s3|cos|obs|oss)([.\-][a-z0-9\-.:]+))");
static const RE2 virtual_hosted_style_pattern(R"((.+)\.(s3|cos|obs|oss|eos)([.\-][a-z0-9\-.:]+))");
/// Case when bucket name and key represented in path of S3 URL.
/// E.g. (https://s3.Region.amazonaws.com/bucket-name/key)
@ -47,6 +47,7 @@ URI::URI(const std::string & uri_)
static constexpr auto COS = "COS";
static constexpr auto OBS = "OBS";
static constexpr auto OSS = "OSS";
static constexpr auto EOS = "EOS";
uri = Poco::URI(uri_);
@ -114,7 +115,7 @@ URI::URI(const std::string & uri_)
}
boost::to_upper(name);
if (name != S3 && name != COS && name != OBS && name != OSS)
if (name != S3 && name != COS && name != OBS && name != OSS && name != EOS)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Object storage system name is unrecognized in virtual hosted style S3 URI: {}",
quoteString(name));
@ -125,6 +126,8 @@ URI::URI(const std::string & uri_)
storage_name = OBS;
else if (name == OSS)
storage_name = OSS;
else if (name == EOS)
storage_name = EOS;
else
storage_name = COSN;
}

View File

@ -29,6 +29,7 @@
#include <Common/Exception.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/NaNUtils.h>
#include <Common/typeid_cast.h>
#include <IO/CompressionMethod.h>
#include <IO/WriteBuffer.h>
@ -36,6 +37,7 @@
#include <IO/VarInt.h>
#include <IO/DoubleConverter.h>
#include <IO/WriteBufferFromString.h>
#include <IO/WriteBufferFromFileDescriptor.h>
#ifdef __clang__
#pragma clang diagnostic push
@ -1405,6 +1407,12 @@ void writePointerHex(const void * ptr, WriteBuffer & buf);
String fourSpaceIndent(size_t indent);
bool inline isWritingToTerminal(const WriteBuffer & buf)
{
const auto * write_buffer_to_descriptor = typeid_cast<const WriteBufferFromFileDescriptor *>(&buf);
return write_buffer_to_descriptor && write_buffer_to_descriptor->getFD() == STDOUT_FILENO && isatty(STDOUT_FILENO);
}
}
template<>

View File

@ -1797,8 +1797,8 @@ void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants, si
rows,
ReadableSize(uncompressed_size),
ReadableSize(compressed_size),
static_cast<double>(uncompressed_size) / rows,
static_cast<double>(compressed_size) / rows,
rows ? static_cast<double>(uncompressed_size) / rows : 0.0,
rows ? static_cast<double>(compressed_size) / rows : 0.0,
static_cast<double>(uncompressed_size) / compressed_size,
static_cast<double>(rows) / elapsed_seconds,
ReadableSize(static_cast<double>(uncompressed_size) / elapsed_seconds),

View File

@ -817,7 +817,7 @@ bool FileCache::tryReserve(FileSegment & file_segment, const size_t size, FileCa
}
file_segment.reserved_size += size;
chassert(file_segment.reserved_size == queue_iterator->getEntry().size);
chassert(file_segment.reserved_size == queue_iterator->getEntry()->size);
if (query_context)
{

View File

@ -789,9 +789,9 @@ bool FileSegment::assertCorrectnessUnlocked(const FileSegmentGuard::Lock &) cons
const auto & entry = it->getEntry();
UNUSED(entry);
chassert(entry.size == reserved_size);
chassert(entry.key == key());
chassert(entry.offset == offset());
chassert(entry->size == reserved_size);
chassert(entry->key == key());
chassert(entry->offset == offset());
};
if (download_state == State::DOWNLOADED)

View File

@ -31,13 +31,14 @@ public:
std::atomic<size_t> size;
size_t hits = 0;
};
using EntryPtr = std::shared_ptr<Entry>;
class Iterator
{
public:
virtual ~Iterator() = default;
virtual const Entry & getEntry() const = 0;
virtual EntryPtr getEntry() const = 0;
virtual size_t increasePriority(const CacheGuard::Lock &) = 0;

View File

@ -36,49 +36,49 @@ IFileCachePriority::IteratorPtr LRUFileCachePriority::add( /// NOLINT
const CacheGuard::Lock & lock,
bool /* is_startup */)
{
return std::make_shared<LRUIterator>(add(Entry(key_metadata->key, offset, size, key_metadata), lock));
return std::make_shared<LRUIterator>(add(std::make_shared<Entry>(key_metadata->key, offset, size, key_metadata), lock));
}
LRUFileCachePriority::LRUIterator LRUFileCachePriority::add(Entry && entry, const CacheGuard::Lock & lock)
LRUFileCachePriority::LRUIterator LRUFileCachePriority::add(EntryPtr entry, const CacheGuard::Lock & lock)
{
if (entry.size == 0)
if (entry->size == 0)
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Adding zero size entries to LRU queue is not allowed "
"(key: {}, offset: {})", entry.key, entry.offset);
"(key: {}, offset: {})", entry->key, entry->offset);
}
#ifndef NDEBUG
for (const auto & queue_entry : queue)
{
/// entry.size == 0 means entry was invalidated.
if (queue_entry.size != 0 && queue_entry.key == entry.key && queue_entry.offset == entry.offset)
if (queue_entry->size != 0 && queue_entry->key == entry->key && queue_entry->offset == entry->offset)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Attempt to add duplicate queue entry to queue. "
"(Key: {}, offset: {}, size: {})",
entry.key, entry.offset, entry.size);
entry->key, entry->offset, entry->size);
}
#endif
const auto & size_limit = getSizeLimit(lock);
if (size_limit && current_size + entry.size > size_limit)
if (size_limit && current_size + entry->size > size_limit)
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Not enough space to add {}:{} with size {}: current size: {}/{}",
entry.key, entry.offset, entry.size, current_size, size_limit);
entry->key, entry->offset, entry->size, current_size, size_limit);
}
auto iterator = queue.insert(queue.end(), entry);
updateSize(entry.size);
updateSize(entry->size);
updateElementsCount(1);
LOG_TEST(
log, "Added entry into LRU queue, key: {}, offset: {}, size: {}",
entry.key, entry.offset, entry.size);
entry->key, entry->offset, entry->size);
return LRUIterator(this, iterator);
}
@ -86,15 +86,16 @@ LRUFileCachePriority::LRUIterator LRUFileCachePriority::add(Entry && entry, cons
LRUFileCachePriority::LRUQueue::iterator LRUFileCachePriority::remove(LRUQueue::iterator it, const CacheGuard::Lock &)
{
/// If size is 0, entry is invalidated, current_elements_num was already updated.
if (it->size)
const auto & entry = **it;
if (entry.size)
{
updateSize(-it->size);
updateSize(-entry.size);
updateElementsCount(-1);
}
LOG_TEST(
log, "Removed entry from LRU queue, key: {}, offset: {}, size: {}",
it->key, it->offset, it->size);
entry.key, entry.offset, entry.size);
return queue.erase(it);
}
@ -143,27 +144,28 @@ void LRUFileCachePriority::iterate(IterateFunc && func, const CacheGuard::Lock &
{
for (auto it = queue.begin(); it != queue.end();)
{
auto locked_key = it->key_metadata->tryLock();
if (!locked_key || it->size == 0)
const auto & entry = **it;
auto locked_key = entry.key_metadata->tryLock();
if (!locked_key || entry.size == 0)
{
it = remove(it, lock);
continue;
}
auto metadata = locked_key->tryGetByOffset(it->offset);
auto metadata = locked_key->tryGetByOffset(entry.offset);
if (!metadata)
{
it = remove(it, lock);
continue;
}
if (metadata->size() != it->size)
if (metadata->size() != entry.size)
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Mismatch of file segment size in file segment metadata "
"and priority queue: {} != {} ({})",
it->size, metadata->size(), metadata->file_segment->getInfoForLog());
entry.size, metadata->size(), metadata->file_segment->getInfoForLog());
}
auto result = func(*locked_key, metadata);
@ -249,7 +251,7 @@ bool LRUFileCachePriority::collectCandidatesForEviction(
LRUFileCachePriority::LRUIterator LRUFileCachePriority::move(LRUIterator & it, LRUFileCachePriority & other, const CacheGuard::Lock &)
{
const auto & entry = it.getEntry();
const auto & entry = *it.getEntry();
if (entry.size == 0)
{
throw Exception(
@ -261,7 +263,7 @@ LRUFileCachePriority::LRUIterator LRUFileCachePriority::move(LRUIterator & it, L
for (const auto & queue_entry : queue)
{
/// entry.size == 0 means entry was invalidated.
if (queue_entry.size != 0 && queue_entry.key == entry.key && queue_entry.offset == entry.offset)
if (queue_entry->size != 0 && queue_entry->key == entry.key && queue_entry->offset == entry.offset)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Attempt to add duplicate queue entry to queue. "
@ -347,34 +349,36 @@ void LRUFileCachePriority::LRUIterator::invalidate()
{
assertValid();
const auto & entry = *iterator;
LOG_TEST(
cache_priority->log,
"Invalidating entry in LRU queue. Key: {}, offset: {}, previous size: {}",
iterator->key, iterator->offset, iterator->size);
entry->key, entry->offset, entry->size);
cache_priority->updateSize(-iterator->size);
cache_priority->updateSize(-entry->size);
cache_priority->updateElementsCount(-1);
iterator->size = 0;
entry->size = 0;
}
void LRUFileCachePriority::LRUIterator::updateSize(int64_t size)
{
assertValid();
const auto & entry = *iterator;
LOG_TEST(
cache_priority->log,
"Update size with {} in LRU queue for key: {}, offset: {}, previous size: {}",
size, iterator->key, iterator->offset, iterator->size);
size, entry->key, entry->offset, entry->size);
cache_priority->updateSize(size);
iterator->size += size;
entry->size += size;
}
size_t LRUFileCachePriority::LRUIterator::increasePriority(const CacheGuard::Lock &)
{
assertValid();
cache_priority->queue.splice(cache_priority->queue.end(), cache_priority->queue, iterator);
return ++iterator->hits;
return ++((*iterator)->hits);
}
void LRUFileCachePriority::LRUIterator::assertValid() const

View File

@ -15,7 +15,7 @@ class LRUFileCachePriority final : public IFileCachePriority
{
private:
class LRUIterator;
using LRUQueue = std::list<Entry>;
using LRUQueue = std::list<EntryPtr>;
friend class SLRUFileCachePriority;
public:
@ -76,7 +76,7 @@ private:
void iterate(IterateFunc && func, const CacheGuard::Lock &);
LRUIterator move(LRUIterator & it, LRUFileCachePriority & other, const CacheGuard::Lock &);
LRUIterator add(Entry && entry, const CacheGuard::Lock &);
LRUIterator add(EntryPtr entry, const CacheGuard::Lock &);
};
class LRUFileCachePriority::LRUIterator : public IFileCachePriority::Iterator
@ -91,7 +91,7 @@ public:
LRUIterator & operator =(const LRUIterator & other);
bool operator ==(const LRUIterator & other) const;
const Entry & getEntry() const override { return *iterator; }
EntryPtr getEntry() const override { return *iterator; }
size_t increasePriority(const CacheGuard::Lock &) override;

View File

@ -61,18 +61,18 @@ IFileCachePriority::IteratorPtr SLRUFileCachePriority::add( /// NOLINT
/// because we do not know the distribution between queues after server restart.
if (probationary_queue.canFit(size, lock))
{
auto lru_iterator = probationary_queue.add(Entry(key_metadata->key, offset, size, key_metadata), lock);
auto lru_iterator = probationary_queue.add(std::make_shared<Entry>(key_metadata->key, offset, size, key_metadata), lock);
return std::make_shared<SLRUIterator>(this, std::move(lru_iterator), false);
}
else
{
auto lru_iterator = protected_queue.add(Entry(key_metadata->key, offset, size, key_metadata), lock);
auto lru_iterator = protected_queue.add(std::make_shared<Entry>(key_metadata->key, offset, size, key_metadata), lock);
return std::make_shared<SLRUIterator>(this, std::move(lru_iterator), true);
}
}
else
{
auto lru_iterator = probationary_queue.add(Entry(key_metadata->key, offset, size, key_metadata), lock);
auto lru_iterator = probationary_queue.add(std::make_shared<Entry>(key_metadata->key, offset, size, key_metadata), lock);
return std::make_shared<SLRUIterator>(this, std::move(lru_iterator), false);
}
}
@ -151,7 +151,7 @@ void SLRUFileCachePriority::increasePriority(SLRUIterator & iterator, const Cach
/// Entry is in probationary queue.
/// We need to move it to protected queue.
const size_t size = iterator.getEntry().size;
const size_t size = iterator.getEntry()->size;
if (size > protected_queue.getSizeLimit(lock))
{
/// Entry size is bigger than the whole protected queue limit.
@ -205,7 +205,7 @@ void SLRUFileCachePriority::increasePriority(SLRUIterator & iterator, const Cach
/// All checks passed, now we can move downgrade candidates to
/// probationary queue and our entry to protected queue.
Entry entry_copy = iterator.getEntry();
EntryPtr entry = iterator.getEntry();
iterator.lru_iterator.remove(lock);
for (const auto & [key, key_candidates] : downgrade_candidates)
@ -218,7 +218,7 @@ void SLRUFileCachePriority::increasePriority(SLRUIterator & iterator, const Cach
}
}
iterator.lru_iterator = protected_queue.add(std::move(entry_copy), lock);
iterator.lru_iterator = protected_queue.add(entry, lock);
iterator.is_protected = true;
}
@ -257,21 +257,21 @@ SLRUFileCachePriority::SLRUIterator::SLRUIterator(
bool is_protected_)
: cache_priority(cache_priority_)
, lru_iterator(lru_iterator_)
, entry(lru_iterator.getEntry())
, is_protected(is_protected_)
{
}
const SLRUFileCachePriority::Entry & SLRUFileCachePriority::SLRUIterator::getEntry() const
SLRUFileCachePriority::EntryPtr SLRUFileCachePriority::SLRUIterator::getEntry() const
{
assertValid();
return lru_iterator.getEntry();
return entry;
}
size_t SLRUFileCachePriority::SLRUIterator::increasePriority(const CacheGuard::Lock & lock)
{
assertValid();
cache_priority->increasePriority(*this, lock);
return getEntry().hits;
return getEntry()->hits;
}
void SLRUFileCachePriority::SLRUIterator::updateSize(int64_t size)

View File

@ -11,10 +11,6 @@ namespace DB
/// the head of the queue, and the record with the highest priority is stored at the tail.
class SLRUFileCachePriority : public IFileCachePriority
{
private:
using LRUIterator = LRUFileCachePriority::LRUIterator;
using LRUQueue = std::list<Entry>;
public:
class SLRUIterator;
@ -62,10 +58,10 @@ class SLRUFileCachePriority::SLRUIterator : public IFileCachePriority::Iterator
public:
SLRUIterator(
SLRUFileCachePriority * cache_priority_,
LRUIterator && lru_iterator_,
LRUFileCachePriority::LRUIterator && lru_iterator_,
bool is_protected_);
const Entry & getEntry() const override;
EntryPtr getEntry() const override;
size_t increasePriority(const CacheGuard::Lock &) override;
@ -81,7 +77,8 @@ private:
void assertValid() const;
SLRUFileCachePriority * cache_priority;
mutable LRUIterator lru_iterator;
LRUFileCachePriority::LRUIterator lru_iterator;
const EntryPtr entry;
bool is_protected;
};

View File

@ -35,6 +35,7 @@ namespace ErrorCodes
{
extern const int KEEPER_EXCEPTION;
extern const int LOGICAL_ERROR;
extern const int NO_ELEMENTS_IN_CONFIG;
}
namespace FailPoints
@ -124,21 +125,33 @@ ClusterDiscovery::ClusterDiscovery(
for (const auto & key : config_keys)
{
String prefix = config_prefix + "." + key + ".discovery";
if (!config.has(prefix))
String cluster_config_prefix = config_prefix + "." + key + ".discovery";
if (!config.has(cluster_config_prefix))
continue;
String zk_root = config.getString(cluster_config_prefix + ".path");
if (zk_root.empty())
throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG, "ZooKeeper path for cluster '{}' is empty", key);
const auto & password = config.getString(cluster_config_prefix + ".password", "");
const auto & cluster_secret = config.getString(cluster_config_prefix + ".secret", "");
if (!password.empty() && !cluster_secret.empty())
throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG, "Both 'password' and 'secret' are specified for cluster '{}', only one option can be used at the same time", key);
clusters_info.emplace(
key,
ClusterInfo(
/* name_= */ key,
/* zk_root_= */ config.getString(prefix + ".path"),
/* host_name= */ config.getString(prefix + ".my_hostname", getFQDNOrHostName()),
/* zk_root_= */ zk_root,
/* host_name= */ config.getString(cluster_config_prefix + ".my_hostname", getFQDNOrHostName()),
/* username= */ config.getString(cluster_config_prefix + ".user", context->getUserName()),
/* password= */ password,
/* cluster_secret= */ cluster_secret,
/* port= */ context->getTCPPort(),
/* secure= */ config.getBool(prefix + ".secure", false),
/* shard_id= */ config.getUInt(prefix + ".shard", 0),
/* observer_mode= */ ConfigHelper::getBool(config, prefix + ".observer"),
/* invisible= */ ConfigHelper::getBool(config, prefix + ".invisible")
/* secure= */ config.getBool(cluster_config_prefix + ".secure", false),
/* shard_id= */ config.getUInt(cluster_config_prefix + ".shard", 0),
/* observer_mode= */ ConfigHelper::getBool(config, cluster_config_prefix + ".observer"),
/* invisible= */ ConfigHelper::getBool(config, cluster_config_prefix + ".invisible")
)
);
}
@ -248,15 +261,15 @@ ClusterPtr ClusterDiscovery::makeCluster(const ClusterInfo & cluster_info)
bool secure = cluster_info.current_node.secure;
ClusterConnectionParameters params{
/* username= */ context->getUserName(),
/* password= */ "",
/* username= */ cluster_info.username,
/* password= */ cluster_info.password,
/* clickhouse_port= */ secure ? context->getTCPPortSecure().value_or(DBMS_DEFAULT_SECURE_PORT) : context->getTCPPort(),
/* treat_local_as_remote= */ false,
/* treat_local_port_as_remote= */ false, /// should be set only for clickhouse-local, but cluster discovery is not used there
/* secure= */ secure,
/* priority= */ Priority{1},
/* cluster_name= */ "",
/* password= */ ""};
/* cluster_name= */ cluster_info.name,
/* cluster_secret= */ cluster_info.cluster_secret};
auto cluster = std::make_shared<Cluster>(
context->getSettingsRef(),
shards,

View File

@ -82,19 +82,31 @@ private:
/// they are mutually invisible to each other.
bool current_cluster_is_invisible = false;
explicit ClusterInfo(const String & name_,
const String & zk_root_,
const String & host_name,
UInt16 port,
bool secure,
size_t shard_id,
bool observer_mode,
bool invisible)
bool is_secure_connection = false;
String username;
String password;
String cluster_secret;
ClusterInfo(const String & name_,
const String & zk_root_,
const String & host_name,
const String & username_,
const String & password_,
const String & cluster_secret_,
UInt16 port,
bool secure,
size_t shard_id,
bool observer_mode,
bool invisible)
: name(name_)
, zk_root(zk_root_)
, current_node(host_name + ":" + toString(port), secure, shard_id)
, current_node_is_observer(observer_mode)
, current_cluster_is_invisible(invisible)
, is_secure_connection(secure)
, username(username_)
, password(password_)
, cluster_secret(cluster_secret_)
{
}
};

View File

@ -215,6 +215,8 @@ struct ContextSharedPart : boost::noncopyable
mutable zkutil::ZooKeeperPtr zookeeper TSA_GUARDED_BY(zookeeper_mutex); /// Client for ZooKeeper.
ConfigurationPtr zookeeper_config TSA_GUARDED_BY(zookeeper_mutex); /// Stores zookeeper configs
ConfigurationPtr sensitive_data_masker_config;
#if USE_NURAFT
mutable std::mutex keeper_dispatcher_mutex;
mutable std::shared_ptr<KeeperDispatcher> keeper_dispatcher TSA_GUARDED_BY(keeper_dispatcher_mutex);
@ -3369,6 +3371,16 @@ bool Context::hasAuxiliaryZooKeeper(const String & name) const
return getConfigRef().has("auxiliary_zookeepers." + name);
}
void Context::reloadQueryMaskingRulesIfChanged(const ConfigurationPtr & config) const
{
const auto old_config = shared->sensitive_data_masker_config;
if (old_config && isSameConfiguration(*config, *old_config, "query_masking_rules"))
return;
SensitiveDataMasker::setInstance(std::make_unique<SensitiveDataMasker>(*config, "query_masking_rules"));
shared->sensitive_data_masker_config = config;
}
InterserverCredentialsPtr Context::getInterserverCredentials() const
{
return shared->interserver_io_credentials.get();

View File

@ -964,6 +964,8 @@ public:
// Reload Zookeeper
void reloadZooKeeperIfChanged(const ConfigurationPtr & config) const;
void reloadQueryMaskingRulesIfChanged(const ConfigurationPtr & config) const;
void setSystemZooKeeperLogAfterInitializationIfNeeded();
/// --- Caches ------------------------------------------------------------------------------------------

View File

@ -198,14 +198,14 @@ void DatabaseCatalog::createBackgroundTasks()
if (Context::getGlobalContextInstance()->getApplicationType() == Context::ApplicationType::SERVER && unused_dir_cleanup_period_sec)
{
auto cleanup_task_holder
= getContext()->getSchedulePool().createTask("DatabaseCatalog", [this]() { this->cleanupStoreDirectoryTask(); });
= getContext()->getSchedulePool().createTask("DatabaseCatalogCleanupStoreDirectoryTask", [this]() { this->cleanupStoreDirectoryTask(); });
cleanup_task = std::make_unique<BackgroundSchedulePoolTaskHolder>(std::move(cleanup_task_holder));
}
auto drop_task_holder = getContext()->getSchedulePool().createTask("DatabaseCatalog", [this](){ this->dropTableDataTask(); });
auto drop_task_holder = getContext()->getSchedulePool().createTask("DatabaseCatalogDropTableTask", [this](){ this->dropTableDataTask(); });
drop_task = std::make_unique<BackgroundSchedulePoolTaskHolder>(std::move(drop_task_holder));
auto reload_disks_task_holder = getContext()->getSchedulePool().createTask("DatabaseCatalog", [this](){ this->reloadDisksTask(); });
auto reload_disks_task_holder = getContext()->getSchedulePool().createTask("DatabaseCatalogReloadDisksTask", [this](){ this->reloadDisksTask(); });
reload_disks_task = std::make_unique<BackgroundSchedulePoolTaskHolder>(std::move(reload_disks_task_holder));
}

View File

@ -1717,18 +1717,14 @@ Block HashJoin::joinBlockImpl(
for (size_t i = 0; i < required_right_keys.columns(); ++i)
{
const auto & right_key = required_right_keys.getByPosition(i);
// renamed ???
if (!block.findByName(right_key.name))
{
/// asof column is already in block.
if (join_features.is_asof_join && right_key.name == table_join->getOnlyClause().key_names_right.back())
continue;
/// asof column is already in block.
if (join_features.is_asof_join && right_key.name == table_join->getOnlyClause().key_names_right.back())
continue;
const auto & left_column = block.getByName(required_right_keys_sources[i]);
const auto & right_col_name = getTableJoin().renamedRightColumnName(right_key.name);
auto right_col = copyLeftKeyColumnToRight(right_key.type, right_col_name, left_column);
block.insert(std::move(right_col));
}
const auto & left_column = block.getByName(required_right_keys_sources[i]);
const auto & right_col_name = getTableJoin().renamedRightColumnName(right_key.name);
auto right_col = copyLeftKeyColumnToRight(right_key.type, right_col_name, left_column);
block.insert(std::move(right_col));
}
}
else if (has_required_right_keys)
@ -1738,19 +1734,16 @@ Block HashJoin::joinBlockImpl(
{
const auto & right_key = required_right_keys.getByPosition(i);
auto right_col_name = getTableJoin().renamedRightColumnName(right_key.name);
if (!block.findByName(right_col_name))
{
/// asof column is already in block.
if (join_features.is_asof_join && right_key.name == table_join->getOnlyClause().key_names_right.back())
continue;
/// asof column is already in block.
if (join_features.is_asof_join && right_key.name == table_join->getOnlyClause().key_names_right.back())
continue;
const auto & left_column = block.getByName(required_right_keys_sources[i]);
auto right_col = copyLeftKeyColumnToRight(right_key.type, right_col_name, left_column, &added_columns.filter);
block.insert(std::move(right_col));
const auto & left_column = block.getByName(required_right_keys_sources[i]);
auto right_col = copyLeftKeyColumnToRight(right_key.type, right_col_name, left_column, &added_columns.filter);
block.insert(std::move(right_col));
if constexpr (join_features.need_replication)
right_keys_to_replicate.push_back(block.getPositionByName(right_col_name));
}
if constexpr (join_features.need_replication)
right_keys_to_replicate.push_back(block.getPositionByName(right_col_name));
}
}
@ -2009,12 +2002,14 @@ struct AdderNonJoined
/// Based on:
/// - map offsetInternal saved in used_flags for single disjuncts
/// - flags in BlockWithFlags for multiple disjuncts
template <bool multiple_disjuncts>
class NotJoinedHash final : public NotJoinedBlocks::RightColumnsFiller
{
public:
NotJoinedHash(const HashJoin & parent_, UInt64 max_block_size_)
: parent(parent_), max_block_size(max_block_size_), current_block_start(0)
NotJoinedHash(const HashJoin & parent_, UInt64 max_block_size_, bool multiple_disjuncts_)
: parent(parent_)
, max_block_size(max_block_size_)
, multiple_disjuncts(multiple_disjuncts_)
, current_block_start(0)
{
if (parent.data == nullptr)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot join after data has been released");
@ -2040,7 +2035,7 @@ public:
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown JOIN strictness '{}' (must be on of: ANY, ALL, ASOF)", parent.strictness);
}
if constexpr (!multiple_disjuncts)
if (!multiple_disjuncts)
{
fillNullsFromBlocks(columns_right, rows_added);
}
@ -2051,6 +2046,7 @@ public:
private:
const HashJoin & parent;
UInt64 max_block_size;
bool multiple_disjuncts;
size_t current_block_start;
@ -2116,7 +2112,7 @@ private:
{
size_t rows_added = 0;
if constexpr (multiple_disjuncts)
if (multiple_disjuncts)
{
if (!used_position.has_value())
used_position = parent.data->blocks.begin();
@ -2206,23 +2202,23 @@ IBlocksStreamPtr HashJoin::getNonJoinedBlocks(const Block & left_sample_block,
{
if (!JoinCommon::hasNonJoinedBlocks(*table_join))
return {};
size_t left_columns_count = left_sample_block.columns();
bool multiple_disjuncts = !table_join->oneDisjunct();
if (!multiple_disjuncts)
{
/// With multiple disjuncts, all keys are in sample_block_with_columns_to_add, so invariant is not held
size_t expected_columns_count = left_columns_count + required_right_keys.columns() + sample_block_with_columns_to_add.columns();
if (expected_columns_count != result_sample_block.columns())
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected number of columns in result sample block: {} instead of {} ({} + {} + {})",
result_sample_block.columns(), expected_columns_count,
left_columns_count, required_right_keys.columns(), sample_block_with_columns_to_add.columns());
}
}
if (multiple_disjuncts)
{
/// ... calculate `left_columns_count` ...
size_t left_columns_count = left_sample_block.columns();
auto non_joined = std::make_unique<NotJoinedHash<true>>(*this, max_block_size);
return std::make_unique<NotJoinedBlocks>(std::move(non_joined), result_sample_block, left_columns_count, *table_join);
}
else
{
size_t left_columns_count = left_sample_block.columns();
assert(left_columns_count == result_sample_block.columns() - required_right_keys.columns() - sample_block_with_columns_to_add.columns());
auto non_joined = std::make_unique<NotJoinedHash<false>>(*this, max_block_size);
return std::make_unique<NotJoinedBlocks>(std::move(non_joined), result_sample_block, left_columns_count, *table_join);
}
auto non_joined = std::make_unique<NotJoinedHash>(*this, max_block_size, multiple_disjuncts);
return std::make_unique<NotJoinedBlocks>(std::move(non_joined), result_sample_block, left_columns_count, *table_join);
}
void HashJoin::reuseJoinedData(const HashJoin & join)

View File

@ -399,7 +399,7 @@ public:
void setMaxJoinedBlockRows(size_t value) { max_joined_block_rows = value; }
private:
template<bool> friend class NotJoinedHash;
friend class NotJoinedHash;
friend class JoinSource;

View File

@ -1124,7 +1124,7 @@ IBlocksStreamPtr MergeJoin::getNonJoinedBlocks(
if (table_join->strictness() == JoinStrictness::All && (is_right || is_full))
{
size_t left_columns_count = left_sample_block.columns();
assert(left_columns_count == result_sample_block.columns() - right_columns_to_add.columns());
chassert(left_columns_count == result_sample_block.columns() - right_columns_to_add.columns());
auto non_joined = std::make_unique<NotJoinedMerge>(*this, max_block_size);
return std::make_unique<NotJoinedBlocks>(std::move(non_joined), result_sample_block, left_columns_count, *table_join);
}

View File

@ -546,7 +546,7 @@ void ThreadStatus::logToQueryThreadLog(QueryThreadLog & thread_log, const String
static String getCleanQueryAst(const ASTPtr q, ContextPtr context)
{
String res = serializeAST(*q);
if (auto * masker = SensitiveDataMasker::getInstance())
if (auto masker = SensitiveDataMasker::getInstance())
masker->wipeSensitiveData(res);
res = res.substr(0, context->getSettingsRef().log_queries_cut_to_length);

View File

@ -1,25 +1,23 @@
#include "OwnSplitChannel.h"
#include "OwnFormattingChannel.h"
#include <Core/Block.h>
#include <Interpreters/InternalTextLogsQueue.h>
#include <Interpreters/TextLog.h>
#include <IO/WriteBufferFromFileDescriptor.h>
#include <sys/time.h>
#include <Poco/Message.h>
#include <Common/CurrentThread.h>
#include <Common/DNSResolver.h>
#include <Common/setThreadName.h>
#include <Common/LockMemoryExceptionInThread.h>
#include <base/getThreadId.h>
#include <Common/SensitiveDataMasker.h>
#include <Common/IO.h>
namespace DB
{
void OwnSplitChannel::log(const Poco::Message & msg)
{
#ifndef WITHOUT_TEXT_LOG
auto logs_queue = CurrentThread::getInternalTextLogsQueue();
@ -27,7 +25,7 @@ void OwnSplitChannel::log(const Poco::Message & msg)
return;
#endif
if (auto * masker = SensitiveDataMasker::getInstance())
if (auto masker = SensitiveDataMasker::getInstance())
{
auto message_text = msg.getText();
auto matches = masker->wipeSensitiveData(message_text);

View File

@ -1,12 +1,13 @@
#pragma once
#include <atomic>
#include <vector>
#include <map>
#include <mutex>
#include <Poco/AutoPtr.h>
#include <Poco/Channel.h>
#include "ExtendedLogChannel.h"
#ifndef WITHOUT_TEXT_LOG
namespace DB
{

View File

@ -520,8 +520,11 @@ static avro::Codec getCodec(const std::string & codec_name)
if (codec_name == "null") return avro::Codec::NULL_CODEC;
if (codec_name == "deflate") return avro::Codec::DEFLATE_CODEC;
if (codec_name == "zstd")
return avro::Codec::ZSTD_CODEC;
#ifdef SNAPPY_CODEC_AVAILABLE
if (codec_name == "snappy") return avro::Codec::SNAPPY_CODEC;
if (codec_name == "snappy")
return avro::Codec::SNAPPY_CODEC;
#endif
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Avro codec {} is not available", codec_name);

View File

@ -12,8 +12,8 @@ namespace DB
{
PrettyBlockOutputFormat::PrettyBlockOutputFormat(
WriteBuffer & out_, const Block & header_, const FormatSettings & format_settings_, bool mono_block_)
: IOutputFormat(header_, out_), format_settings(format_settings_), serializations(header_.getSerializations()), mono_block(mono_block_)
WriteBuffer & out_, const Block & header_, const FormatSettings & format_settings_, bool mono_block_, bool color_)
: IOutputFormat(header_, out_), format_settings(format_settings_), serializations(header_.getSerializations()), color(color_), mono_block(mono_block_)
{
}
@ -237,7 +237,7 @@ void PrettyBlockOutputFormat::writeChunk(const Chunk & chunk, PortKind port_kind
const auto & col = header.getByPosition(i);
if (format_settings.pretty.color)
if (color)
writeCString("\033[1m", out);
if (col.type->shouldAlignRightInPrettyFormats())
@ -255,7 +255,7 @@ void PrettyBlockOutputFormat::writeChunk(const Chunk & chunk, PortKind port_kind
writeChar(' ', out);
}
if (format_settings.pretty.color)
if (color)
writeCString("\033[0m", out);
}
writeCString(" ", out);
@ -335,7 +335,7 @@ void PrettyBlockOutputFormat::writeValueWithPadding(
reinterpret_cast<const UInt8 *>(serialized_value.data()), serialized_value.size(), 0, 1 + format_settings.pretty.max_value_width));
const char * ellipsis = format_settings.pretty.charset == FormatSettings::Pretty::Charset::UTF8 ? "" : "~";
if (format_settings.pretty.color)
if (color)
{
serialized_value += "\033[31;1m";
serialized_value += ellipsis;

View File

@ -5,7 +5,6 @@
#include <Formats/FormatSettings.h>
#include <Formats/FormatFactory.h>
namespace DB
{
@ -19,10 +18,8 @@ class PrettyBlockOutputFormat : public IOutputFormat
{
public:
/// no_escapes - do not use ANSI escape sequences - to display in the browser, not in the console.
PrettyBlockOutputFormat(WriteBuffer & out_, const Block & header_, const FormatSettings & format_settings_, bool mono_block_);
PrettyBlockOutputFormat(WriteBuffer & out_, const Block & header_, const FormatSettings & format_settings_, bool mono_block_, bool color_);
String getName() const override { return "PrettyBlockOutputFormat"; }
protected:
void consume(Chunk) override;
void consumeTotals(Chunk) override;
@ -57,6 +54,8 @@ protected:
total_rows = 0;
}
bool color;
private:
bool mono_block;
/// For mono_block == true only
@ -73,13 +72,8 @@ void registerPrettyFormatWithNoEscapesAndMonoBlock(FormatFactory & factory, cons
const Block & sample,
const FormatSettings & format_settings)
{
if (no_escapes)
{
FormatSettings changed_settings = format_settings;
changed_settings.pretty.color = false;
return std::make_shared<OutputFormat>(buf, sample, changed_settings, mono_block);
}
return std::make_shared<OutputFormat>(buf, sample, format_settings, mono_block);
bool color = !no_escapes && format_settings.pretty.color.valueOr(format_settings.is_writing_to_terminal);
return std::make_shared<OutputFormat>(buf, sample, format_settings, mono_block, color);
});
if (!mono_block)
factory.markOutputFormatSupportsParallelFormatting(name);

View File

@ -48,8 +48,8 @@ GridSymbols ascii_grid_symbols {
}
PrettyCompactBlockOutputFormat::PrettyCompactBlockOutputFormat(WriteBuffer & out_, const Block & header, const FormatSettings & format_settings_, bool mono_block_)
: PrettyBlockOutputFormat(out_, header, format_settings_, mono_block_)
PrettyCompactBlockOutputFormat::PrettyCompactBlockOutputFormat(WriteBuffer & out_, const Block & header, const FormatSettings & format_settings_, bool mono_block_, bool color_)
: PrettyBlockOutputFormat(out_, header, format_settings_, mono_block_, color_)
{
}
@ -87,18 +87,18 @@ void PrettyCompactBlockOutputFormat::writeHeader(
for (size_t k = 0; k < max_widths[i] - name_widths[i]; ++k)
writeCString(grid_symbols.dash, out);
if (format_settings.pretty.color)
if (color)
writeCString("\033[1m", out);
writeString(col.name, out);
if (format_settings.pretty.color)
if (color)
writeCString("\033[0m", out);
}
else
{
if (format_settings.pretty.color)
if (color)
writeCString("\033[1m", out);
writeString(col.name, out);
if (format_settings.pretty.color)
if (color)
writeCString("\033[0m", out);
for (size_t k = 0; k < max_widths[i] - name_widths[i]; ++k)

View File

@ -13,7 +13,7 @@ namespace DB
class PrettyCompactBlockOutputFormat : public PrettyBlockOutputFormat
{
public:
PrettyCompactBlockOutputFormat(WriteBuffer & out_, const Block & header, const FormatSettings & format_settings_, bool mono_block_);
PrettyCompactBlockOutputFormat(WriteBuffer & out_, const Block & header, const FormatSettings & format_settings_, bool mono_block_, bool color);
String getName() const override { return "PrettyCompactBlockOutputFormat"; }
private:

View File

@ -48,18 +48,18 @@ void PrettySpaceBlockOutputFormat::writeChunk(const Chunk & chunk, PortKind port
for (ssize_t k = 0; k < std::max(0z, static_cast<ssize_t>(max_widths[i] - name_widths[i])); ++k)
writeChar(' ', out);
if (format_settings.pretty.color)
if (color)
writeCString("\033[1m", out);
writeString(col.name, out);
if (format_settings.pretty.color)
if (color)
writeCString("\033[0m", out);
}
else
{
if (format_settings.pretty.color)
if (color)
writeCString("\033[1m", out);
writeString(col.name, out);
if (format_settings.pretty.color)
if (color)
writeCString("\033[0m", out);
for (ssize_t k = 0; k < std::max(0z, static_cast<ssize_t>(max_widths[i] - name_widths[i])); ++k)

View File

@ -11,8 +11,8 @@ namespace DB
class PrettySpaceBlockOutputFormat : public PrettyBlockOutputFormat
{
public:
PrettySpaceBlockOutputFormat(WriteBuffer & out_, const Block & header, const FormatSettings & format_settings_, bool mono_block_)
: PrettyBlockOutputFormat(out_, header, format_settings_, mono_block_) {}
PrettySpaceBlockOutputFormat(WriteBuffer & out_, const Block & header, const FormatSettings & format_settings_, bool mono_block_, bool color_)
: PrettyBlockOutputFormat(out_, header, format_settings_, mono_block_, color_) {}
String getName() const override { return "PrettySpaceBlockOutputFormat"; }

View File

@ -228,21 +228,27 @@ public:
{
auto get_raw_read_buf = [&]() -> std::unique_ptr<ReadBuffer>
{
auto buf = std::make_unique<ReadBufferFromHDFS>(
hdfs_namenode_url,
current_path,
getContext()->getGlobalContext()->getConfigRef(),
getContext()->getReadSettings());
bool thread_pool_read = read_settings.remote_fs_method == RemoteFSReadMethod::threadpool;
if (thread_pool_read)
{
auto buf = std::make_unique<ReadBufferFromHDFS>(
hdfs_namenode_url,
current_path,
getContext()->getGlobalContext()->getConfigRef(),
getContext()->getReadSettings(),
/* read_until_position */0,
/* use_external_buffer */true);
return std::make_unique<AsynchronousReadBufferFromHDFS>(
getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER), read_settings, std::move(buf));
}
else
{
return buf;
return std::make_unique<ReadBufferFromHDFS>(
hdfs_namenode_url,
current_path,
getContext()->getGlobalContext()->getConfigRef(),
getContext()->getReadSettings());
}
};

View File

@ -1,6 +1,7 @@
#include <Storages/MergeTree/IMergeTreeReader.h>
#include <DataTypes/NestedUtils.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeNested.h>
#include <Common/escapeForFileName.h>
#include <Compression/CachedCompressedReadBuffer.h>
#include <Columns/ColumnArray.h>
@ -140,16 +141,29 @@ void IMergeTreeReader::evaluateMissingDefaults(Block additional_columns, Columns
}
}
bool IMergeTreeReader::isSubcolumnOffsetsOfNested(const String & name_in_storage, const String & subcolumn_name) const
{
/// We cannot read separate subcolumn with offsets from compact parts.
if (!data_part_info_for_read->isWidePart() || subcolumn_name != "size0")
return false;
return Nested::isSubcolumnOfNested(name_in_storage, part_columns);
}
String IMergeTreeReader::getColumnNameInPart(const NameAndTypePair & required_column) const
{
auto name_in_storage = required_column.getNameInStorage();
if (alter_conversions->isColumnRenamed(name_in_storage))
{
name_in_storage = alter_conversions->getColumnOldName(name_in_storage);
return Nested::concatenateName(name_in_storage, required_column.getSubcolumnName());
}
auto subcolumn_name = required_column.getSubcolumnName();
return required_column.name;
if (alter_conversions->isColumnRenamed(name_in_storage))
name_in_storage = alter_conversions->getColumnOldName(name_in_storage);
/// A special case when we read subcolumn of shared offsets of Nested.
/// E.g. instead of requested column "n.arr1.size0" we must read column "n.size0" from disk.
if (isSubcolumnOffsetsOfNested(name_in_storage, subcolumn_name))
name_in_storage = Nested::splitName(name_in_storage).first;
return Nested::concatenateName(name_in_storage, subcolumn_name);
}
NameAndTypePair IMergeTreeReader::getColumnInPart(const NameAndTypePair & required_column) const

View File

@ -65,14 +65,14 @@ public:
protected:
/// Returns actual column name in part, which can differ from table metadata.
String getColumnNameInPart(const NameAndTypePair & required_column) const;
/// Returns actual column name and type in part, which can differ from table metadata.
NameAndTypePair getColumnInPart(const NameAndTypePair & required_column) const;
/// Returns actual serialization in part, which can differ from table metadata.
SerializationPtr getSerializationInPart(const NameAndTypePair & required_column) const;
/// Returns true if requested column is a subcolumn with offsets of Array which is part of Nested column.
bool isSubcolumnOffsetsOfNested(const String & name_in_storage, const String & subcolumn_name) const;
void checkNumberOfColumns(size_t num_columns_to_read) const;
String getMessageForDiagnosticOfBrokenPart(size_t from_mark, size_t max_rows_to_read) const;
/// avg_value_size_hints are used to reduce the number of reallocations when creating columns of variable size.

View File

@ -1,3 +1,4 @@
#include <iterator>
#include <Storages/MergeTree/MergeTreeReadPoolParallelReplicas.h>
@ -67,15 +68,11 @@ MergeTreeReadTaskPtr MergeTreeReadPoolParallelReplicas::getTask(size_t /*task_id
auto & current_task = buffered_ranges.front();
size_t part_idx = 0;
for (size_t index = 0; index < per_part_infos.size(); ++index)
{
if (per_part_infos[index]->data_part->info == current_task.info)
{
part_idx = index;
break;
}
}
auto part_it
= std::ranges::find_if(per_part_infos, [&current_task](const auto & part) { return part->data_part->info == current_task.info; });
if (part_it == per_part_infos.end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Assignment contains an unknown part (current_task: {})", current_task.describe());
const size_t part_idx = std::distance(per_part_infos.begin(), part_it);
MarkRanges ranges_to_read;
size_t current_sum_marks = 0;

View File

@ -604,6 +604,8 @@ void DefaultCoordinator::processPartsFurther(
{
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::ParallelReplicasProcessingPartsMicroseconds);
auto replica_can_read_part = [&](auto replica, const auto & part) { return part_visibility[part.getPartNameV1()].contains(replica); };
for (const auto & part : all_parts_to_read)
{
if (current_marks_amount >= min_number_of_marks)
@ -627,7 +629,7 @@ void DefaultCoordinator::processPartsFurther(
= MarkRange{std::max(range.begin, segment_begin), std::min(range.end, segment_begin + mark_segment_size)};
const auto owner = computeConsistentHash(part.description.info.getPartNameV1(), segment_begin, scan_mode);
if (owner == replica_num)
if (owner == replica_num && replica_can_read_part(replica_num, part.description.info))
{
const auto taken = takeFromRange(cur_segment, min_number_of_marks, current_marks_amount, result);
if (taken == range.getNumberOfMarks())

View File

@ -14,7 +14,9 @@
#include <Columns/ColumnConst.h>
#include <Columns/ColumnSet.h>
#include <Functions/indexHint.h>
#include <Functions/IFunction.h>
#include <Functions/IFunctionAdaptors.h>
#include <Storages/KeyDescription.h>
@ -390,6 +392,15 @@ size_t RPNBuilderFunctionTreeNode::getArgumentsSize() const
}
else
{
// indexHint arguments are stored inside of `FunctionIndexHint` class,
// because they are used only for index analysis.
if (dag_node->function_base->getName() == "indexHint")
{
const auto * adaptor = typeid_cast<const FunctionToFunctionBaseAdaptor *>(dag_node->function_base.get());
const auto * index_hint = typeid_cast<const FunctionIndexHint *>(adaptor->getFunction().get());
return index_hint->getActions()->getOutputs().size();
}
return dag_node->children.size();
}
}
@ -409,6 +420,15 @@ RPNBuilderTreeNode RPNBuilderFunctionTreeNode::getArgumentAt(size_t index) const
}
else
{
// indexHint arguments are stored inside of `FunctionIndexHint` class,
// because they are used only for index analysis.
if (dag_node->function_base->getName() == "indexHint")
{
const auto * adaptor = typeid_cast<const FunctionToFunctionBaseAdaptor *>(dag_node->function_base.get());
const auto * index_hint = typeid_cast<const FunctionIndexHint *>(adaptor->getFunction().get());
return RPNBuilderTreeNode(index_hint->getActions()->getOutputs()[index], tree_context);
}
return RPNBuilderTreeNode(dag_node->children[index], tree_context);
}
}

View File

@ -2690,7 +2690,7 @@ ReplicatedMergeTreeQueue::addSubscriber(ReplicatedMergeTreeQueue::SubscriberCall
}
}
LOG_TEST(log, "Waiting for {} entries to be processed: {}", out_entry_names.size(), fmt::join(out_entry_names, ", "));
LOG_TRACE(log, "Waiting for {} entries to be processed: {}", out_entry_names.size(), fmt::join(out_entry_names, ", "));
}
auto it = subscribers.emplace(subscribers.end(), std::move(callback));

View File

@ -104,6 +104,8 @@ protected:
Chunk generate() override;
private:
NameAndTypePair getColumnOnDisk(const NameAndTypePair & column) const;
const size_t block_size;
const NamesAndTypesList columns;
const StorageLog & storage;
@ -149,6 +151,22 @@ private:
bool isFinished();
};
NameAndTypePair LogSource::getColumnOnDisk(const NameAndTypePair & column) const
{
const auto & storage_columns = storage.columns_with_collected_nested;
/// A special case when we read subcolumn of shared offsets of Nested.
/// E.g. instead of requested column "n.arr1.size0" we must read column "n.size0" from disk.
auto name_in_storage = column.getNameInStorage();
if (column.getSubcolumnName() == "size0" && Nested::isSubcolumnOfNested(name_in_storage, storage_columns))
{
auto nested_name_in_storage = Nested::splitName(name_in_storage).first;
auto new_name = Nested::concatenateName(nested_name_in_storage, column.getSubcolumnName());
return storage_columns.getColumnOrSubcolumn(GetColumnsOptions::All, new_name);
}
return column;
}
Chunk LogSource::generate()
{
@ -169,19 +187,21 @@ Chunk LogSource::generate()
for (const auto & name_type : columns)
{
ColumnPtr column;
auto name_type_on_disk = getColumnOnDisk(name_type);
try
{
column = name_type.type->createColumn();
readData(name_type, column, max_rows_to_read, caches[name_type.getNameInStorage()]);
column = name_type_on_disk.type->createColumn();
readData(name_type_on_disk, column, max_rows_to_read, caches[name_type_on_disk.getNameInStorage()]);
}
catch (Exception & e)
{
e.addMessage("while reading column " + name_type.name + " at " + fullPath(storage.disk, storage.table_path));
e.addMessage("while reading column " + name_type_on_disk.name + " at " + fullPath(storage.disk, storage.table_path));
throw;
}
if (!column->empty())
res.insert(ColumnWithTypeAndName(column, name_type.type, name_type.name));
res.insert(ColumnWithTypeAndName(column, name_type_on_disk.type, name_type_on_disk.name));
}
if (res)
@ -600,6 +620,7 @@ StorageLog::StorageLog(
}
}
columns_with_collected_nested = ColumnsDescription{Nested::collect(columns_.getAll())};
total_bytes = file_checker.getTotalSize();
}
@ -820,10 +841,6 @@ Pipe StorageLog::read(
if (num_streams > max_streams)
num_streams = max_streams;
auto options = GetColumnsOptions(GetColumnsOptions::All).withSubcolumns();
auto all_columns = storage_snapshot->getColumnsByNames(options, column_names);
all_columns = Nested::convertToSubcolumns(all_columns);
std::vector<size_t> offsets;
offsets.resize(num_data_files, 0);
@ -840,6 +857,12 @@ Pipe StorageLog::read(
ReadSettings read_settings = local_context->getReadSettings();
Pipes pipes;
/// Converting to subcolumns of Nested is needed for
/// correct reading of parts of Nested with shared offsets.
auto options = GetColumnsOptions(GetColumnsOptions::All).withSubcolumns();
auto all_columns = storage_snapshot->getColumnsByNames(options, column_names);
all_columns = Nested::convertToSubcolumns(all_columns);
for (size_t stream = 0; stream < num_streams; ++stream)
{
if (use_marks_file)

View File

@ -133,6 +133,9 @@ private:
size_t num_data_files = 0;
std::map<String, DataFile *> data_files_by_names;
/// The same as metadata->columns but after call of Nested::collect().
ColumnsDescription columns_with_collected_nested;
/// The Log engine uses the marks file, and the TinyLog engine doesn't.
const bool use_marks_file;

View File

@ -1,28 +1,24 @@
#!/usr/bin/env python3
from base64 import b64decode
from collections import namedtuple
from typing import Any, Dict, List, Optional, Tuple
from threading import Thread
from queue import Queue
import json
import re
import time
from base64 import b64decode
from collections import namedtuple
from queue import Queue
from threading import Thread
from typing import Any, Dict, List, Optional, Tuple
import requests # type: ignore
from lambda_shared.pr import CATEGORY_TO_LABEL, check_pr_description
from lambda_shared.token import get_cached_access_token
NEED_RERUN_ON_EDITED = {
"PullRequestCI",
"DocsCheck",
}
NEED_RERUN_OR_CANCELL_WORKFLOWS = {
"BackportPR",
}.union(NEED_RERUN_ON_EDITED)
"DocsCheck",
"MasterCI",
"PullRequestCI",
}
MAX_RETRY = 5

View File

@ -0,0 +1,41 @@
import time
def check_on_cluster(
nodes,
expected,
*,
what,
cluster_name="test_auto_cluster",
msg=None,
retries=5,
query_params={},
):
"""
Select data from `system.clusters` on specified nodes and check the result
"""
assert 1 <= retries <= 6
node_results = {}
for retry in range(1, retries + 1):
for node in nodes:
if node_results.get(node.name) == expected:
# do not retry node after success
continue
query_text = (
f"SELECT {what} FROM system.clusters WHERE cluster = '{cluster_name}'"
)
node_results[node.name] = int(node.query(query_text, **query_params))
if all(actual == expected for actual in node_results.values()):
break
print(f"Retry {retry}/{retries} unsuccessful, result: {node_results}")
if retry != retries:
time.sleep(2**retry)
else:
msg = msg or f"Wrong '{what}' result"
raise Exception(
f"{msg}: {node_results}, expected: {expected} (after {retries} retries)"
)

View File

@ -0,0 +1,21 @@
<clickhouse>
<allow_experimental_cluster_discovery>1</allow_experimental_cluster_discovery>
<remote_servers>
<test_auto_cluster_with_pwd>
<discovery>
<path>/clickhouse/discovery/test_auto_cluster_with_pwd</path>
<user>user1</user>
<password>password123</password>
</discovery>
</test_auto_cluster_with_pwd>
<test_auto_cluster_with_wrong_pwd>
<discovery>
<path>/clickhouse/discovery/test_auto_cluster_with_wrong_pwd</path>
<user>user1</user>
<password>wrongpass1234</password>
</discovery>
</test_auto_cluster_with_wrong_pwd>
</remote_servers>
</clickhouse>

View File

@ -0,0 +1,22 @@
<clickhouse>
<allow_experimental_cluster_discovery>1</allow_experimental_cluster_discovery>
<remote_servers>
<test_auto_cluster_with_secret>
<discovery>
<path>/clickhouse/discovery/test_auto_cluster_with_secret</path>
<secret>secret123</secret>
</discovery>
</test_auto_cluster_with_secret>
<test_auto_cluster_with_wrong_secret>
<discovery>
<path>/clickhouse/discovery/test_auto_cluster_with_wrong_secret</path>
<secret>correctsecret321</secret>
</discovery>
</test_auto_cluster_with_wrong_secret>
</remote_servers>
</clickhouse>

View File

@ -0,0 +1,22 @@
<clickhouse>
<allow_experimental_cluster_discovery>1</allow_experimental_cluster_discovery>
<remote_servers>
<test_auto_cluster_with_secret>
<discovery>
<path>/clickhouse/discovery/test_auto_cluster_with_secret</path>
<secret>secret123</secret>
</discovery>
</test_auto_cluster_with_secret>
<test_auto_cluster_with_wrong_secret>
<discovery>
<path>/clickhouse/discovery/test_auto_cluster_with_wrong_secret</path>
<secret>wrongsecret333</secret>
</discovery>
</test_auto_cluster_with_wrong_secret>
</remote_servers>
</clickhouse>

View File

@ -0,0 +1,17 @@
<clickhouse>
<profiles>
<default>
</default>
</profiles>
<users>
<default>
<password>passwordAbc</password>
<profile>default</profile>
</default>
<user1>
<password>password123</password>
<profile>default</profile>
</user1>
</users>
</clickhouse>

View File

@ -1,7 +1,8 @@
import pytest
import functools
import time
from .common import check_on_cluster
from helpers.cluster import ClickHouseCluster
@ -36,39 +37,6 @@ def start_cluster():
cluster.shutdown()
def check_on_cluster(
nodes, expected, *, what, cluster_name="test_auto_cluster", msg=None, retries=5
):
"""
Select data from `system.clusters` on specified nodes and check the result
"""
assert 1 <= retries <= 6
node_results = {}
for retry in range(1, retries + 1):
for node in nodes:
if node_results.get(node.name) == expected:
# do not retry node after success
continue
query_text = (
f"SELECT {what} FROM system.clusters WHERE cluster = '{cluster_name}'"
)
node_results[node.name] = int(node.query(query_text))
if all(actual == expected for actual in node_results.values()):
break
print(f"Retry {retry}/{retries} unsuccessful, result: {node_results}")
if retry != retries:
time.sleep(2**retry)
else:
msg = msg or f"Wrong '{what}' result"
raise Exception(
f"{msg}: {node_results}, expected: {expected} (after {retries} retries)"
)
def test_cluster_discovery_startup_and_stop(start_cluster):
"""
Start cluster, check nodes count in system.clusters,

View File

@ -0,0 +1,72 @@
import pytest
from .common import check_on_cluster
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
nodes = {
"node0": cluster.add_instance(
"node0",
main_configs=["config/config_with_pwd.xml", "config/config_with_secret1.xml"],
user_configs=["config/users.d/users_with_pwd.xml"],
stay_alive=True,
with_zookeeper=True,
),
"node1": cluster.add_instance(
"node1",
main_configs=["config/config_with_pwd.xml", "config/config_with_secret2.xml"],
user_configs=["config/users.d/users_with_pwd.xml"],
stay_alive=True,
with_zookeeper=True,
),
}
@pytest.fixture(scope="module")
def start_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_connect_with_password(start_cluster):
check_on_cluster(
[nodes["node0"], nodes["node1"]],
len(nodes),
cluster_name="test_auto_cluster_with_pwd",
what="count()",
msg="Wrong nodes count in cluster",
query_params={"password": "passwordAbc"},
)
result = nodes["node0"].query(
"SELECT sum(number) FROM clusterAllReplicas('test_auto_cluster_with_pwd', numbers(3)) GROUP BY hostname()",
password="passwordAbc",
)
assert result == "3\n3\n", result
result = nodes["node0"].query_and_get_error(
"SELECT sum(number) FROM clusterAllReplicas('test_auto_cluster_with_wrong_pwd', numbers(3)) GROUP BY hostname()",
password="passwordAbc",
)
assert "Authentication failed" in result, result
result = nodes["node0"].query(
"SELECT sum(number) FROM clusterAllReplicas('test_auto_cluster_with_secret', numbers(3)) GROUP BY hostname()",
password="passwordAbc",
)
assert result == "3\n3\n", result
result = nodes["node0"].query_and_get_error(
"SELECT sum(number) FROM clusterAllReplicas('test_auto_cluster_with_wrong_secret', numbers(3)) GROUP BY hostname()",
password="passwordAbc",
)
# With an incorrect secret, we don't get "Authentication failed", but the connection is simply dropped.
# So, we get messages like "Connection reset by peer" or "Attempt to read after eof".
# We only check that an error occurred and the message is not empty.
assert result

View File

@ -0,0 +1,19 @@
<clickhouse>
<query_log>
<database>system</database>
<table>query_log</table>
<partition_by>toYYYYMM(event_date)</partition_by>
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
<max_size_rows>1048576</max_size_rows>
<reserved_size_rows>8192</reserved_size_rows>
<buffer_size_rows_flush_threshold>524288</buffer_size_rows_flush_threshold>
<flush_on_crash>false</flush_on_crash>
</query_log>
<query_masking_rules>
<rule>
<regexp>TOPSECRET.TOPSECRET</regexp>
<replace>[hidden]</replace>
</rule>
</query_masking_rules>
</clickhouse>

View File

@ -0,0 +1,12 @@
<clickhouse>
<query_log>
<database>system</database>
<table>query_log</table>
<partition_by>toYYYYMM(event_date)</partition_by>
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
<max_size_rows>1048576</max_size_rows>
<reserved_size_rows>8192</reserved_size_rows>
<buffer_size_rows_flush_threshold>524288</buffer_size_rows_flush_threshold>
<flush_on_crash>false</flush_on_crash>
</query_log>
</clickhouse>

View File

@ -0,0 +1,57 @@
import pytest
import os
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import assert_eq_with_retry, assert_logs_contain_with_retry
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance("node", user_configs=["configs/empty_settings.xml"])
@pytest.fixture(scope="module", autouse=True)
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
@pytest.fixture(autouse=True)
def reset_to_normal_settings_after_test():
try:
node.copy_file_to_container(
os.path.join(SCRIPT_DIR, "configs/empty_settings.xml"),
"/etc/clickhouse-server/config.d/z.xml",
)
node.query("SYSTEM RELOAD CONFIG")
yield
finally:
pass
# @pytest.mark.parametrize("reload_strategy", ["force", "timeout"])
def test_reload_query_masking_rules():
# At first, empty configuration is fed to ClickHouse. The query
# "SELECT 'TOPSECRET.TOPSECRET'" will not be redacted, and the new masking
# event will not be registered
node.query("SELECT 'TOPSECRET.TOPSECRET'")
assert_logs_contain_with_retry(node, "SELECT 'TOPSECRET.TOPSECRET'")
assert not node.contains_in_log(r"SELECT '\[hidden\]'")
node.rotate_logs()
node.copy_file_to_container(
os.path.join(SCRIPT_DIR, "configs/changed_settings.xml"),
"/etc/clickhouse-server/config.d/z.xml",
)
node.query("SYSTEM RELOAD CONFIG")
# Now the same query will be redacted in the logs and the counter of events
# will be incremented
node.query("SELECT 'TOPSECRET.TOPSECRET'")
assert_logs_contain_with_retry(node, r"SELECT '\[hidden\]'")
assert not node.contains_in_log("SELECT 'TOPSECRET.TOPSECRET'")
node.rotate_logs()

View File

@ -1079,7 +1079,7 @@ def test_startup_without_zk(started_cluster):
err = main_node.query_and_get_error(
"CREATE DATABASE startup ENGINE = Replicated('/clickhouse/databases/startup', 'shard1', 'replica1');"
)
assert "ZooKeeper" in err
assert "ZooKeeper" in err or "Coordination::Exception" in err
main_node.query(
"CREATE DATABASE startup ENGINE = Replicated('/clickhouse/databases/startup', 'shard1', 'replica1');"
)

View File

@ -4450,7 +4450,7 @@ def test_block_based_formats_1(kafka_cluster):
kafka_group_name = '{topic}',
kafka_format = 'PrettySpace';
INSERT INTO test.kafka SELECT number * 10 as key, number * 100 as value FROM numbers(5) settings max_block_size=2, optimize_trivial_insert_select=0;
INSERT INTO test.kafka SELECT number * 10 as key, number * 100 as value FROM numbers(5) settings max_block_size=2, optimize_trivial_insert_select=0, output_format_pretty_color=1;
"""
)

View File

@ -3156,7 +3156,7 @@ def test_block_based_formats_1(rabbitmq_cluster):
)
instance.query(
"INSERT INTO test.rabbitmq SELECT number * 10 as key, number * 100 as value FROM numbers(5) settings max_block_size=2, optimize_trivial_insert_select=0;"
"INSERT INTO test.rabbitmq SELECT number * 10 as key, number * 100 as value FROM numbers(5) settings max_block_size=2, optimize_trivial_insert_select=0, output_format_pretty_color=1;"
)
insert_messages = []

View File

@ -45,11 +45,14 @@
dest-file (str dest-folder "/clickhouse")
dest-symlink (str root-folder "/" expected-file-name)
wget-opts (concat cu/std-wget-opts [:-O dest-file])]
(when-not (cu/exists? dest-file)
(info "Downloading" url)
(do (c/exec :mkdir :-p dest-folder)
(c/cd dest-folder
(if-not (cu/exists? dest-file)
(do
(info "Downloading" url)
(do (c/exec :mkdir :-p dest-folder)
(c/cd dest-folder
(cu/wget-helper! wget-opts url))))
(info "Binary is already downloaded"))
(c/exec :rm :-rf dest-symlink)
(c/exec :ln :-s dest-file dest-symlink)
dest-symlink))

View File

@ -1 +1,2 @@
SET output_format_pretty_color=1;
SELECT (toDate('2000-01-01'), toDate('2000-01-01')) AS x FORMAT PrettyCompact;

View File

@ -1 +1 @@
SELECT arr, count() AS c FROM (SELECT arrayMap(x -> x % 2, groupArray(number)) AS arr FROM (SELECT number FROM system.numbers LIMIT 10000) GROUP BY number % ((number * 0xABCDEF0123456789 % 1234) + 1)) GROUP BY arr ORDER BY c DESC, arr ASC;
SELECT arr, count() AS c FROM (SELECT arrayMap(x -> x % 2, arraySort(groupArray(number))) AS arr FROM (SELECT number FROM system.numbers LIMIT 10000) GROUP BY number % ((number * 0xABCDEF0123456789 % 1234) + 1)) GROUP BY arr ORDER BY c DESC, arr ASC;

Some files were not shown because too many files have changed in this diff Show More