mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 23:21:59 +00:00
Merge master
This commit is contained in:
commit
ca4363e2e3
6
.github/workflows/release.yml
vendored
6
.github/workflows/release.yml
vendored
@ -1,4 +1,4 @@
|
||||
name: ReleaseWorkflow
|
||||
name: PublishedReleaseCI
|
||||
# - Gets artifacts from S3
|
||||
# - Sends it to JFROG Artifactory
|
||||
# - Adds them to the release assets
|
||||
@ -15,7 +15,7 @@ jobs:
|
||||
- name: Set envs
|
||||
run: |
|
||||
cat >> "$GITHUB_ENV" << 'EOF'
|
||||
JFROG_API_KEY=${{ secrets.JFROG_KEY_API_PACKAGES }}
|
||||
JFROG_API_KEY=${{ secrets.JFROG_ARTIFACTORY_API_KEY }}
|
||||
TEMP_PATH=${{runner.temp}}/release_packages
|
||||
REPO_COPY=${{runner.temp}}/release_packages/ClickHouse
|
||||
EOF
|
||||
@ -30,7 +30,7 @@ jobs:
|
||||
cp -r "$GITHUB_WORKSPACE" "$TEMP_PATH"
|
||||
cd "$REPO_COPY"
|
||||
python3 ./tests/ci/push_to_artifactory.py --release "${{ github.ref }}" \
|
||||
--commit '${{ github.sha }}' --all
|
||||
--commit '${{ github.sha }}' --artifactory-url "${{ secrets.JFROG_ARTIFACTORY_URL }}" --all
|
||||
- name: Upload packages to release assets
|
||||
uses: svenstaro/upload-release-action@v2
|
||||
with:
|
||||
|
2
.github/workflows/release_branches.yml
vendored
2
.github/workflows/release_branches.yml
vendored
@ -1,4 +1,4 @@
|
||||
name: ReleaseCI
|
||||
name: ReleaseBranchCI
|
||||
|
||||
env:
|
||||
# Force the stdout and stderr streams to be unbuffered
|
||||
|
2
contrib/NuRaft
vendored
2
contrib/NuRaft
vendored
@ -1 +1 @@
|
||||
Subproject commit 33f60f961d4914441b684af43e9e5535078ba54b
|
||||
Subproject commit bdba298189e29995892de78dcecf64d127444e81
|
18
docs/changelogs/v22.8.4.7-lts.md
Normal file
18
docs/changelogs/v22.8.4.7-lts.md
Normal file
@ -0,0 +1,18 @@
|
||||
---
|
||||
sidebar_position: 1
|
||||
sidebar_label: 2022
|
||||
---
|
||||
|
||||
# 2022 Changelog
|
||||
|
||||
### ClickHouse release v22.8.4.7-lts (baad27bcd2f) FIXME as compared to v22.8.3.13-lts (6a15b73faea)
|
||||
|
||||
#### Bug Fix (user-visible misbehavior in official stable or prestable release)
|
||||
|
||||
* Backported in [#40760](https://github.com/ClickHouse/ClickHouse/issues/40760): Fix possible error 'Decimal math overflow' while parsing DateTime64. [#40546](https://github.com/ClickHouse/ClickHouse/pull/40546) ([Kruglov Pavel](https://github.com/Avogar)).
|
||||
* Backported in [#40811](https://github.com/ClickHouse/ClickHouse/issues/40811): In [#40595](https://github.com/ClickHouse/ClickHouse/issues/40595) it was reported that the `host_regexp` functionality was not working properly with a name to address resolution in `/etc/hosts`. It's fixed. [#40769](https://github.com/ClickHouse/ClickHouse/pull/40769) ([Arthur Passos](https://github.com/arthurpassos)).
|
||||
|
||||
#### NOT FOR CHANGELOG / INSIGNIFICANT
|
||||
|
||||
* Migrate artifactory [#40831](https://github.com/ClickHouse/ClickHouse/pull/40831) ([Mikhail f. Shiryaev](https://github.com/Felixoid)).
|
||||
|
@ -1,10 +1,10 @@
|
||||
---
|
||||
slug: /en/operations/backup
|
||||
sidebar_position: 49
|
||||
sidebar_label: Data Backup
|
||||
sidebar_label: Data backup and restore
|
||||
---
|
||||
|
||||
# Data Backup
|
||||
# Data backup and restore
|
||||
|
||||
While [replication](../engines/table-engines/mergetree-family/replication.md) provides protection from hardware failures, it does not protect against human errors: accidental deletion of data, deletion of the wrong table or a table on the wrong cluster, and software bugs that result in incorrect data processing or data corruption. In many cases mistakes like these will affect all replicas. ClickHouse has built-in safeguards to prevent some types of mistakes — for example, by default [you can’t just drop tables with a MergeTree-like engine containing more than 50 Gb of data](server-configuration-parameters/settings.md#max-table-size-to-drop). However, these safeguards do not cover all possible cases and can be circumvented.
|
||||
|
||||
@ -16,21 +16,181 @@ Each company has different resources available and business requirements, so the
|
||||
Keep in mind that if you backed something up and never tried to restore it, chances are that restore will not work properly when you actually need it (or at least it will take longer than business can tolerate). So whatever backup approach you choose, make sure to automate the restore process as well, and practice it on a spare ClickHouse cluster regularly.
|
||||
:::
|
||||
|
||||
## Duplicating Source Data Somewhere Else {#duplicating-source-data-somewhere-else}
|
||||
## Configure a backup destination
|
||||
|
||||
In the examples below you will see the backup destination specified like `Disk('backups', '1.zip')`. To prepare the destination add a file to `/etc/clickhouse-server/config.d/backup_disk.xml` specifying the backup destination. For example, this file defines disk named `backups` and then adds that disk to the **backups > allowed_disk** list:
|
||||
|
||||
```xml
|
||||
<clickhouse>
|
||||
<storage_configuration>
|
||||
<disks>
|
||||
<!--highlight-next-line -->
|
||||
<backups>
|
||||
<type>local</type>
|
||||
<path>/backups/</path>
|
||||
</backups>
|
||||
</disks>
|
||||
</storage_configuration>
|
||||
<!--highlight-start -->
|
||||
<backups>
|
||||
<allowed_disk>backups</allowed_disk>
|
||||
<allowed_path>/backups/</allowed_path>
|
||||
</backups>
|
||||
<!--highlight-end -->
|
||||
</clickhouse>
|
||||
```
|
||||
|
||||
## Parameters
|
||||
|
||||
Backups can be either full or incremental, and can include tables (including materialized views, projections, and dictionaries), and databases. Backups can be synchronous (default) or asynchronous. They can be compressed. Backups can be password protected.
|
||||
|
||||
The BACKUP and RESTORE statements take a list of DATABASE and TABLE names, a destination (or source), options and settings:
|
||||
- The destination for the backup, or the source for the restore. This is based on the disk defined earlier. For example `Disk('backups', 'filename.zip')`
|
||||
- ASYNC: backup or restore asynchronously
|
||||
- PARTITIONS: a list of partitions to restore
|
||||
- SETTINGS:
|
||||
- [`compression_method`](en/sql-reference/statements/create/table/#column-compression-codecs) and compression_level
|
||||
- `password` for the file on disk
|
||||
- `base_backup`: the destination of the previous backup of this source. For example, `Disk('backups', '1.zip')`
|
||||
|
||||
## Usage examples
|
||||
|
||||
Backup and then restore a table:
|
||||
```
|
||||
BACKUP TABLE test.table TO Disk('backups', '1.zip')
|
||||
```
|
||||
|
||||
Corresponding restore:
|
||||
```
|
||||
RESTORE TABLE test.table FROM Disk('backups', '1.zip')
|
||||
```
|
||||
|
||||
:::note
|
||||
The above RESTORE would fail if the table `test.table` contains data, you would have to drop the table in order to test the RESTORE, or use the setting `allow_non_empty_tables=true`:
|
||||
```
|
||||
RESTORE TABLE test.table FROM Disk('backups', '1.zip')
|
||||
SETTINGS allow_non_empty_tables=true
|
||||
```
|
||||
:::
|
||||
|
||||
Tables can be restored, or backed up, with new names:
|
||||
```
|
||||
RESTORE TABLE test.table AS test.table2 FROM Disk('backups', '1.zip')
|
||||
```
|
||||
|
||||
```
|
||||
BACKUP TABLE test.table3 AS test.table4 TO Disk('backups', '2.zip')
|
||||
```
|
||||
|
||||
## Incremental backups
|
||||
|
||||
Incremental backups can be taken by specifying the `base_backup`.
|
||||
:::note
|
||||
Incremental backups depend on the base backup. The base backup must be kept available in order to be able to restore from an incremental backup.
|
||||
:::
|
||||
|
||||
Incrementally store new data. The setting `base_backup` causes data since a previous backup to `Disk('backups', 'd.zip')` to be stored to `Disk('backups', 'incremental-a.zip')`:
|
||||
```
|
||||
BACKUP TABLE test.table TO Disk('backups', 'incremental-a.zip')
|
||||
SETTINGS base_backup = Disk('backups', 'd.zip')
|
||||
```
|
||||
|
||||
Restore all data from the incremental backup and the base_backup into a new table `test.table2`:
|
||||
```
|
||||
RESTORE TABLE test.table AS test.table2
|
||||
FROM Disk('backups', 'incremental-a.zip');
|
||||
```
|
||||
|
||||
## Assign a password to the backup
|
||||
|
||||
Backups written to disk can have a password applied to the file:
|
||||
```
|
||||
BACKUP TABLE test.table
|
||||
TO Disk('backups', 'password-protected.zip')
|
||||
SETTINGS password='qwerty'
|
||||
```
|
||||
|
||||
Restore:
|
||||
```
|
||||
RESTORE TABLE test.table
|
||||
FROM Disk('backups', 'password-protected.zip')
|
||||
SETTINGS password='qwerty'
|
||||
```
|
||||
|
||||
## Compression settings
|
||||
|
||||
If you would like to specify the compression method or level:
|
||||
```
|
||||
BACKUP TABLE test.table
|
||||
TO Disk('backups', 'filename.zip')
|
||||
SETTINGS compression_method='lzma', compression_level=3
|
||||
```
|
||||
|
||||
## Restore specific partitions
|
||||
If specific partitions associated with a table need to be restored these can be specified. To restore partitions 1 and 4 from backup:
|
||||
```
|
||||
RESTORE TABLE test.table PARTITIONS '2', '3'
|
||||
FROM Disk('backups', 'filename.zip')
|
||||
```
|
||||
|
||||
## Check the status of backups
|
||||
|
||||
The backup command returns an `id` and `status`, and that `id` can be used to get the status of the backup. This is very useful to check the progress of long ASYNC backups. The example below shows a failure that happened when trying to overwrite an existing backup file:
|
||||
```sql
|
||||
BACKUP TABLE helloworld.my_first_table TO Disk('backups', '1.zip') ASYNC
|
||||
```
|
||||
```response
|
||||
┌─id───────────────────────────────────┬─status──────────┐
|
||||
│ 7678b0b3-f519-4e6e-811f-5a0781a4eb52 │ CREATING_BACKUP │
|
||||
└──────────────────────────────────────┴─────────────────┘
|
||||
|
||||
1 row in set. Elapsed: 0.001 sec.
|
||||
```
|
||||
|
||||
```
|
||||
SELECT
|
||||
*
|
||||
FROM system.backups
|
||||
where id='7678b0b3-f519-4e6e-811f-5a0781a4eb52'
|
||||
FORMAT Vertical
|
||||
```
|
||||
```response
|
||||
Row 1:
|
||||
──────
|
||||
id: 7678b0b3-f519-4e6e-811f-5a0781a4eb52
|
||||
name: Disk('backups', '1.zip')
|
||||
#highlight-next-line
|
||||
status: BACKUP_FAILED
|
||||
num_files: 0
|
||||
uncompressed_size: 0
|
||||
compressed_size: 0
|
||||
#highlight-next-line
|
||||
error: Code: 598. DB::Exception: Backup Disk('backups', '1.zip') already exists. (BACKUP_ALREADY_EXISTS) (version 22.8.2.11 (official build))
|
||||
start_time: 2022-08-30 09:21:46
|
||||
end_time: 2022-08-30 09:21:46
|
||||
|
||||
1 row in set. Elapsed: 0.002 sec.
|
||||
```
|
||||
|
||||
## Alternatives
|
||||
|
||||
ClickHouse stores data on disk, and there are many ways to backup disks. These are some alternatives that have been used in the past, and that may fit in well in your environment.
|
||||
|
||||
### Duplicating Source Data Somewhere Else {#duplicating-source-data-somewhere-else}
|
||||
|
||||
Often data that is ingested into ClickHouse is delivered through some sort of persistent queue, such as [Apache Kafka](https://kafka.apache.org). In this case it is possible to configure an additional set of subscribers that will read the same data stream while it is being written to ClickHouse and store it in cold storage somewhere. Most companies already have some default recommended cold storage, which could be an object store or a distributed filesystem like [HDFS](https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/HdfsDesign.html).
|
||||
|
||||
## Filesystem Snapshots {#filesystem-snapshots}
|
||||
### Filesystem Snapshots {#filesystem-snapshots}
|
||||
|
||||
Some local filesystems provide snapshot functionality (for example, [ZFS](https://en.wikipedia.org/wiki/ZFS)), but they might not be the best choice for serving live queries. A possible solution is to create additional replicas with this kind of filesystem and exclude them from the [Distributed](../engines/table-engines/special/distributed.md) tables that are used for `SELECT` queries. Snapshots on such replicas will be out of reach of any queries that modify data. As a bonus, these replicas might have special hardware configurations with more disks attached per server, which would be cost-effective.
|
||||
|
||||
## clickhouse-copier {#clickhouse-copier}
|
||||
### clickhouse-copier {#clickhouse-copier}
|
||||
|
||||
[clickhouse-copier](../operations/utilities/clickhouse-copier.md) is a versatile tool that was initially created to re-shard petabyte-sized tables. It can also be used for backup and restore purposes because it reliably copies data between ClickHouse tables and clusters.
|
||||
|
||||
For smaller volumes of data, a simple `INSERT INTO ... SELECT ...` to remote tables might work as well.
|
||||
|
||||
## Manipulations with Parts {#manipulations-with-parts}
|
||||
### Manipulations with Parts {#manipulations-with-parts}
|
||||
|
||||
ClickHouse allows using the `ALTER TABLE ... FREEZE PARTITION ...` query to create a local copy of table partitions. This is implemented using hardlinks to the `/var/lib/clickhouse/shadow/` folder, so it usually does not consume extra disk space for old data. The created copies of files are not handled by ClickHouse server, so you can just leave them there: you will have a simple backup that does not require any additional external system, but it will still be prone to hardware issues. For this reason, it’s better to remotely copy them to another location and then remove the local copies. Distributed filesystems and object stores are still a good options for this, but normal attached file servers with a large enough capacity might work as well (in this case the transfer will occur via the network filesystem or maybe [rsync](https://en.wikipedia.org/wiki/Rsync)).
|
||||
Data can be restored from backup using the `ALTER TABLE ... ATTACH PARTITION ...`
|
||||
@ -39,4 +199,3 @@ For more information about queries related to partition manipulations, see the [
|
||||
|
||||
A third-party tool is available to automate this approach: [clickhouse-backup](https://github.com/AlexAkulov/clickhouse-backup).
|
||||
|
||||
[Original article](https://clickhouse.com/docs/en/operations/backup/) <!--hide-->
|
||||
|
@ -1069,7 +1069,7 @@ Formats a Time according to the given Format string. Format is a constant expres
|
||||
**Syntax**
|
||||
|
||||
``` sql
|
||||
formatDateTime(Time, Format\[, Timezone\])
|
||||
formatDateTime(Time, Format[, Timezone])
|
||||
```
|
||||
|
||||
**Returned value(s)**
|
||||
@ -1105,6 +1105,7 @@ Using replacement fields, you can define a pattern for the resulting string. “
|
||||
| %w | weekday as a decimal number with Sunday as 0 (0-6) | 2 |
|
||||
| %y | Year, last two digits (00-99) | 18 |
|
||||
| %Y | Year | 2018 |
|
||||
| %z | Time offset from UTC as +HHMM or -HHMM | -0500 |
|
||||
| %% | a % sign | % |
|
||||
|
||||
**Example**
|
||||
|
@ -1017,7 +1017,7 @@ SELECT timeSlots(toDateTime64('1980-12-12 21:01:02.1234', 4, 'UTC'), toDecimal64
|
||||
**Синтаксис**
|
||||
|
||||
``` sql
|
||||
formatDateTime(Time, Format\[, Timezone\])
|
||||
formatDateTime(Time, Format[, Timezone])
|
||||
```
|
||||
|
||||
**Возвращаемое значение**
|
||||
|
@ -956,7 +956,7 @@ SELECT
|
||||
**语法**
|
||||
|
||||
``` sql
|
||||
formatDateTime(Time, Format\[, Timezone\])
|
||||
formatDateTime(Time, Format[, Timezone])
|
||||
```
|
||||
|
||||
**返回值**
|
||||
|
@ -736,7 +736,9 @@ int Server::main(const std::vector<std::string> & /*args*/)
|
||||
std::vector<ProtocolServerAdapter> servers_to_start_before_tables;
|
||||
/// This object will periodically calculate some metrics.
|
||||
AsynchronousMetrics async_metrics(
|
||||
global_context, config().getUInt("asynchronous_metrics_update_period_s", 1),
|
||||
global_context,
|
||||
config().getUInt("asynchronous_metrics_update_period_s", 1),
|
||||
config().getUInt("asynchronous_heavy_metrics_update_period_s", 120),
|
||||
[&]() -> std::vector<ProtocolServerMetrics>
|
||||
{
|
||||
std::vector<ProtocolServerMetrics> metrics;
|
||||
|
@ -65,9 +65,31 @@
|
||||
in specified format like JSON.
|
||||
For example, as below:
|
||||
{"date_time":"1650918987.180175","thread_name":"#1","thread_id":"254545","level":"Trace","query_id":"","logger_name":"BaseDaemon","message":"Received signal 2","source_file":"../base/daemon/BaseDaemon.cpp; virtual void SignalListener::run()","source_line":"192"}
|
||||
To enable JSON logging support, just uncomment <formatting> tag below.
|
||||
To enable JSON logging support, please uncomment the entire <formatting> tag below.
|
||||
|
||||
a) You can modify key names by changing values under tag values inside <names> tag.
|
||||
For example, to change DATE_TIME to MY_DATE_TIME, you can do like:
|
||||
<date_time>MY_DATE_TIME</date_time>
|
||||
b) You can stop unwanted log properties to appear in logs. To do so, you can simply comment out (recommended)
|
||||
that property from this file.
|
||||
For example, if you do not want your log to print query_id, you can comment out only <query_id> tag.
|
||||
However, if you comment out all the tags under <names>, the program will print default values for as
|
||||
below.
|
||||
-->
|
||||
<!-- <formatting>json</formatting> -->
|
||||
<!-- <formatting>
|
||||
<type>json</type>
|
||||
<names>
|
||||
<date_time>date_time</date_time>
|
||||
<thread_name>thread_name</thread_name>
|
||||
<thread_id>thread_id</thread_id>
|
||||
<level>level</level>
|
||||
<query_id>query_id</query_id>
|
||||
<logger_name>logger_name</logger_name>
|
||||
<message>message</message>
|
||||
<source_file>source_file</source_file>
|
||||
<source_line>source_line</source_line>
|
||||
</names>
|
||||
</formatting> -->
|
||||
</logger>
|
||||
|
||||
<!-- Add headers to response in options request. OPTIONS method is used in CORS preflight requests. -->
|
||||
|
@ -247,6 +247,7 @@ add_object_library(clickhouse_databases Databases)
|
||||
add_object_library(clickhouse_databases_mysql Databases/MySQL)
|
||||
add_object_library(clickhouse_disks Disks)
|
||||
add_object_library(clickhouse_interpreters Interpreters)
|
||||
add_object_library(clickhouse_interpreters_cache Interpreters/Cache)
|
||||
add_object_library(clickhouse_interpreters_access Interpreters/Access)
|
||||
add_object_library(clickhouse_interpreters_mysql Interpreters/MySQL)
|
||||
add_object_library(clickhouse_interpreters_clusterproxy Interpreters/ClusterProxy)
|
||||
|
@ -91,13 +91,6 @@ static const NameSet exit_strings
|
||||
"q", "й", "\\q", "\\Q", "\\й", "\\Й", ":q", "Жй"
|
||||
};
|
||||
|
||||
static const std::initializer_list<std::pair<String, String>> backslash_aliases
|
||||
{
|
||||
{ "\\l", "SHOW DATABASES" },
|
||||
{ "\\d", "SHOW TABLES" },
|
||||
{ "\\c", "USE" },
|
||||
};
|
||||
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
@ -1999,6 +1992,21 @@ void ClientBase::runInteractive()
|
||||
/// Enable bracketed-paste-mode so that we are able to paste multiline queries as a whole.
|
||||
lr.enableBracketedPaste();
|
||||
|
||||
static const std::initializer_list<std::pair<String, String>> backslash_aliases =
|
||||
{
|
||||
{ "\\l", "SHOW DATABASES" },
|
||||
{ "\\d", "SHOW TABLES" },
|
||||
{ "\\c", "USE" },
|
||||
};
|
||||
|
||||
static const std::initializer_list<String> repeat_last_input_aliases =
|
||||
{
|
||||
".", /// Vim shortcut
|
||||
"/" /// Oracle SQL Plus shortcut
|
||||
};
|
||||
|
||||
String last_input;
|
||||
|
||||
do
|
||||
{
|
||||
auto input = lr.readLine(prompt(), ":-] ");
|
||||
@ -2016,7 +2024,7 @@ void ClientBase::runInteractive()
|
||||
has_vertical_output_suffix = true;
|
||||
}
|
||||
|
||||
for (const auto& [alias, command] : backslash_aliases)
|
||||
for (const auto & [alias, command] : backslash_aliases)
|
||||
{
|
||||
auto it = std::search(input.begin(), input.end(), alias.begin(), alias.end());
|
||||
if (it != input.end() && std::all_of(input.begin(), it, isWhitespaceASCII))
|
||||
@ -2034,10 +2042,20 @@ void ClientBase::runInteractive()
|
||||
}
|
||||
}
|
||||
|
||||
for (const auto & alias : repeat_last_input_aliases)
|
||||
{
|
||||
if (input == alias)
|
||||
{
|
||||
input = last_input;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
if (!processQueryText(input))
|
||||
break;
|
||||
last_input = input;
|
||||
}
|
||||
catch (const Exception & e)
|
||||
{
|
||||
|
@ -154,13 +154,15 @@ FieldInfo getFieldInfo(const Field & field)
|
||||
{
|
||||
FieldVisitorToScalarType to_scalar_type_visitor;
|
||||
applyVisitor(to_scalar_type_visitor, field);
|
||||
FieldVisitorToNumberOfDimensions to_number_dimension_visitor;
|
||||
|
||||
return
|
||||
{
|
||||
to_scalar_type_visitor.getScalarType(),
|
||||
to_scalar_type_visitor.haveNulls(),
|
||||
to_scalar_type_visitor.needConvertField(),
|
||||
applyVisitor(FieldVisitorToNumberOfDimensions(), field),
|
||||
applyVisitor(to_number_dimension_visitor, field),
|
||||
to_number_dimension_visitor.need_fold_dimension
|
||||
};
|
||||
}
|
||||
|
||||
|
@ -35,6 +35,10 @@ struct FieldInfo
|
||||
|
||||
/// Number of dimension in array. 0 if field is scalar.
|
||||
size_t num_dimensions;
|
||||
|
||||
/// If true then this field is an array of variadic dimension field
|
||||
/// and we need to normalize the dimension
|
||||
bool need_fold_dimension;
|
||||
};
|
||||
|
||||
FieldInfo getFieldInfo(const Field & field);
|
||||
|
@ -370,6 +370,7 @@ void KeeperServer::startup(const Poco::Util::AbstractConfiguration & config, boo
|
||||
{
|
||||
auto log_entries = log_store->log_entries(state_machine->last_commit_index() + 1, next_log_idx);
|
||||
|
||||
size_t preprocessed = 0;
|
||||
LOG_INFO(log, "Preprocessing {} log entries", log_entries->size());
|
||||
auto idx = state_machine->last_commit_index() + 1;
|
||||
for (const auto & entry : *log_entries)
|
||||
@ -378,7 +379,12 @@ void KeeperServer::startup(const Poco::Util::AbstractConfiguration & config, boo
|
||||
state_machine->pre_commit(idx, entry->get_buf());
|
||||
|
||||
++idx;
|
||||
++preprocessed;
|
||||
|
||||
if (preprocessed % 50000 == 0)
|
||||
LOG_TRACE(log, "Preprocessed {}/{} entries", preprocessed, log_entries->size());
|
||||
}
|
||||
LOG_INFO(log, "Preprocessing done");
|
||||
}
|
||||
|
||||
loadLatestConfig();
|
||||
|
@ -369,7 +369,15 @@ void KeeperStorage::UncommittedState::addDeltas(std::vector<Delta> new_deltas)
|
||||
const auto & added_delta = deltas.emplace_back(std::move(delta));
|
||||
|
||||
if (!added_delta.path.empty())
|
||||
{
|
||||
deltas_for_path[added_delta.path].push_back(&added_delta);
|
||||
applyDelta(added_delta);
|
||||
}
|
||||
else if (const auto * auth_delta = std::get_if<AddAuthDelta>(&added_delta.operation))
|
||||
{
|
||||
auto & uncommitted_auth = session_and_auth[auth_delta->session_id];
|
||||
uncommitted_auth.emplace_back(&auth_delta->auth_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -385,6 +393,26 @@ void KeeperStorage::UncommittedState::commit(int64_t commit_zxid)
|
||||
break;
|
||||
}
|
||||
|
||||
auto & front_delta = deltas.front();
|
||||
|
||||
if (!front_delta.path.empty())
|
||||
{
|
||||
auto & path_deltas = deltas_for_path.at(front_delta.path);
|
||||
assert(path_deltas.front() == &front_delta);
|
||||
path_deltas.pop_front();
|
||||
if (path_deltas.empty())
|
||||
deltas_for_path.erase(front_delta.path);
|
||||
}
|
||||
else if (auto * add_auth = std::get_if<AddAuthDelta>(&front_delta.operation))
|
||||
{
|
||||
auto & uncommitted_auth = session_and_auth[add_auth->session_id];
|
||||
assert(!uncommitted_auth.empty() && uncommitted_auth.front() == &add_auth->auth_id);
|
||||
uncommitted_auth.pop_front();
|
||||
if (uncommitted_auth.empty())
|
||||
session_and_auth.erase(add_auth->session_id);
|
||||
|
||||
}
|
||||
|
||||
deltas.pop_front();
|
||||
}
|
||||
|
||||
@ -405,10 +433,12 @@ void KeeperStorage::UncommittedState::rollback(int64_t rollback_zxid)
|
||||
deltas.back().zxid,
|
||||
rollback_zxid);
|
||||
|
||||
auto delta_it = deltas.rbegin();
|
||||
|
||||
// we need to undo ephemeral mapping modifications
|
||||
// CreateNodeDelta added ephemeral for session id -> we need to remove it
|
||||
// RemoveNodeDelta removed ephemeral for session id -> we need to add it back
|
||||
for (auto delta_it = deltas.rbegin(); delta_it != deltas.rend(); ++delta_it)
|
||||
for (; delta_it != deltas.rend(); ++delta_it)
|
||||
{
|
||||
if (delta_it->zxid < rollback_zxid)
|
||||
break;
|
||||
@ -431,29 +461,56 @@ void KeeperStorage::UncommittedState::rollback(int64_t rollback_zxid)
|
||||
}
|
||||
},
|
||||
delta_it->operation);
|
||||
|
||||
auto & path_deltas = deltas_for_path.at(delta_it->path);
|
||||
if (path_deltas.back() == &*delta_it)
|
||||
{
|
||||
path_deltas.pop_back();
|
||||
if (path_deltas.empty())
|
||||
deltas_for_path.erase(delta_it->path);
|
||||
}
|
||||
}
|
||||
else if (auto * add_auth = std::get_if<AddAuthDelta>(&delta_it->operation))
|
||||
{
|
||||
auto & uncommitted_auth = session_and_auth[add_auth->session_id];
|
||||
if (uncommitted_auth.back() == &add_auth->auth_id)
|
||||
{
|
||||
uncommitted_auth.pop_back();
|
||||
if (uncommitted_auth.empty())
|
||||
session_and_auth.erase(add_auth->session_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
std::erase_if(deltas, [rollback_zxid](const auto & delta) { return delta.zxid == rollback_zxid; });
|
||||
if (delta_it == deltas.rend())
|
||||
deltas.clear();
|
||||
else
|
||||
deltas.erase(delta_it.base(), deltas.end());
|
||||
|
||||
std::unordered_set<std::string> deleted_nodes;
|
||||
absl::flat_hash_set<std::string> deleted_nodes;
|
||||
std::erase_if(
|
||||
nodes,
|
||||
[&, rollback_zxid](const auto & node)
|
||||
{
|
||||
if (node.second.zxid == rollback_zxid)
|
||||
{
|
||||
deleted_nodes.emplace(node.first);
|
||||
deleted_nodes.emplace(std::move(node.first));
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
});
|
||||
|
||||
// recalculate all the uncommitted deleted nodes
|
||||
for (const auto & delta : deltas)
|
||||
for (const auto & deleted_node : deleted_nodes)
|
||||
{
|
||||
if (!delta.path.empty() && deleted_nodes.contains(delta.path))
|
||||
applyDelta(delta);
|
||||
auto path_delta_it = deltas_for_path.find(deleted_node);
|
||||
if (path_delta_it != deltas_for_path.end())
|
||||
{
|
||||
for (const auto & delta : path_delta_it->second)
|
||||
{
|
||||
applyDelta(*delta);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -229,27 +229,42 @@ public:
|
||||
|
||||
bool hasACL(int64_t session_id, bool is_local, std::function<bool(const AuthID &)> predicate)
|
||||
{
|
||||
for (const auto & session_auth : storage.session_and_auth[session_id])
|
||||
const auto check_auth = [&](const auto & auth_ids)
|
||||
{
|
||||
if (predicate(session_auth))
|
||||
return true;
|
||||
}
|
||||
for (const auto & auth : auth_ids)
|
||||
{
|
||||
using TAuth = std::remove_reference_t<decltype(auth)>;
|
||||
|
||||
const AuthID * auth_ptr = nullptr;
|
||||
if constexpr (std::is_pointer_v<TAuth>)
|
||||
auth_ptr = auth;
|
||||
else
|
||||
auth_ptr = &auth;
|
||||
|
||||
if (predicate(*auth_ptr))
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
};
|
||||
|
||||
if (is_local)
|
||||
return check_auth(storage.session_and_auth[session_id]);
|
||||
|
||||
if (check_auth(storage.session_and_auth[session_id]))
|
||||
return true;
|
||||
|
||||
// check if there are uncommitted
|
||||
const auto auth_it = session_and_auth.find(session_id);
|
||||
if (auth_it == session_and_auth.end())
|
||||
return false;
|
||||
|
||||
for (const auto & delta : deltas)
|
||||
{
|
||||
if (const auto * auth_delta = std::get_if<KeeperStorage::AddAuthDelta>(&delta.operation);
|
||||
auth_delta && auth_delta->session_id == session_id && predicate(auth_delta->auth_id))
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
return check_auth(auth_it->second);
|
||||
}
|
||||
|
||||
std::shared_ptr<Node> tryGetNodeFromStorage(StringRef path) const;
|
||||
|
||||
std::unordered_map<int64_t, std::list<const AuthID *>> session_and_auth;
|
||||
|
||||
struct UncommittedNode
|
||||
{
|
||||
std::shared_ptr<Node> node{nullptr};
|
||||
@ -257,7 +272,32 @@ public:
|
||||
int64_t zxid{0};
|
||||
};
|
||||
|
||||
mutable std::unordered_map<std::string, UncommittedNode> nodes;
|
||||
struct Hash
|
||||
{
|
||||
auto operator()(const std::string_view view) const
|
||||
{
|
||||
SipHash hash;
|
||||
hash.update(view);
|
||||
return hash.get64();
|
||||
}
|
||||
|
||||
using is_transparent = void; // required to make find() work with different type than key_type
|
||||
};
|
||||
|
||||
struct Equal
|
||||
{
|
||||
auto operator()(const std::string_view a,
|
||||
const std::string_view b) const
|
||||
{
|
||||
return a == b;
|
||||
}
|
||||
|
||||
using is_transparent = void; // required to make find() work with different type than key_type
|
||||
};
|
||||
|
||||
mutable std::unordered_map<std::string, UncommittedNode, Hash, Equal> nodes;
|
||||
std::unordered_map<std::string, std::list<const Delta *>, Hash, Equal> deltas_for_path;
|
||||
|
||||
std::list<Delta> deltas;
|
||||
KeeperStorage & storage;
|
||||
};
|
||||
|
@ -1016,8 +1016,8 @@ void BaseDaemon::setupWatchdog()
|
||||
if (config().getRawString("logger.stream_compress", "false") == "true")
|
||||
{
|
||||
Poco::AutoPtr<OwnPatternFormatter> pf;
|
||||
if (config().getString("logger.formatting", "") == "json")
|
||||
pf = new OwnJSONPatternFormatter;
|
||||
if (config().getString("logger.formatting.type", "") == "json")
|
||||
pf = new OwnJSONPatternFormatter(config());
|
||||
else
|
||||
pf = new OwnPatternFormatter;
|
||||
Poco::AutoPtr<DB::OwnFormattingChannel> log = new DB::OwnFormattingChannel(pf, new Poco::ConsoleChannel(std::cerr));
|
||||
|
@ -737,14 +737,31 @@ Field FieldVisitorReplaceScalars::operator()(const Array & x) const
|
||||
return res;
|
||||
}
|
||||
|
||||
size_t FieldVisitorToNumberOfDimensions::operator()(const Array & x) const
|
||||
size_t FieldVisitorToNumberOfDimensions::operator()(const Array & x)
|
||||
{
|
||||
const size_t size = x.size();
|
||||
size_t dimensions = 0;
|
||||
for (size_t i = 0; i < size; ++i)
|
||||
dimensions = std::max(dimensions, applyVisitor(*this, x[i]));
|
||||
{
|
||||
size_t element_dimensions = applyVisitor(*this, x[i]);
|
||||
if (i > 0 && element_dimensions != dimensions)
|
||||
need_fold_dimension = true;
|
||||
dimensions = std::max(dimensions, element_dimensions);
|
||||
}
|
||||
|
||||
return 1 + dimensions;
|
||||
}
|
||||
|
||||
Field FieldVisitorFoldDimension::operator()(const Array & x) const
|
||||
{
|
||||
if (num_dimensions_to_fold == 0)
|
||||
return x;
|
||||
const size_t size = x.size();
|
||||
Array res(size);
|
||||
for (size_t i = 0; i < size; ++i)
|
||||
{
|
||||
res[i] = applyVisitor(FieldVisitorFoldDimension(num_dimensions_to_fold - 1), x[i]);
|
||||
}
|
||||
return res;
|
||||
}
|
||||
}
|
||||
|
@ -114,10 +114,42 @@ private:
|
||||
class FieldVisitorToNumberOfDimensions : public StaticVisitor<size_t>
|
||||
{
|
||||
public:
|
||||
size_t operator()(const Array & x) const;
|
||||
size_t operator()(const Array & x);
|
||||
|
||||
template <typename T>
|
||||
size_t operator()(const T &) const { return 0; }
|
||||
|
||||
bool need_fold_dimension = false;
|
||||
};
|
||||
|
||||
/// Fold field (except Null) to the higher dimension, e.g. `1` -- fold 2 --> `[[1]]`
|
||||
/// used to normalize dimension of element in an array. e.g [1, [2]] --> [[1], [2]]
|
||||
class FieldVisitorFoldDimension : public StaticVisitor<Field>
|
||||
{
|
||||
public:
|
||||
explicit FieldVisitorFoldDimension(size_t num_dimensions_to_fold_) : num_dimensions_to_fold(num_dimensions_to_fold_) { }
|
||||
|
||||
Field operator()(const Array & x) const;
|
||||
|
||||
Field operator()(const Null & x) const { return x; }
|
||||
|
||||
template <typename T>
|
||||
Field operator()(const T & x) const
|
||||
{
|
||||
if (num_dimensions_to_fold == 0)
|
||||
return x;
|
||||
Array res(1,x);
|
||||
for (size_t i = 1; i < num_dimensions_to_fold; ++i)
|
||||
{
|
||||
Array new_res;
|
||||
new_res.push_back(std::move(res));
|
||||
res = std::move(new_res);
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
private:
|
||||
size_t num_dimensions_to_fold;
|
||||
};
|
||||
|
||||
/// Receives range of objects, which contains collections
|
||||
|
@ -65,6 +65,8 @@ void SerializationObject<Parser>::deserializeTextImpl(IColumn & column, Reader &
|
||||
for (size_t i = 0; i < paths.size(); ++i)
|
||||
{
|
||||
auto field_info = getFieldInfo(values[i]);
|
||||
if (field_info.need_fold_dimension)
|
||||
values[i] = applyVisitor(FieldVisitorFoldDimension(field_info.num_dimensions), std::move(values[i]));
|
||||
if (isNothing(field_info.scalar_type))
|
||||
continue;
|
||||
|
||||
|
@ -1,13 +1,13 @@
|
||||
#pragma once
|
||||
|
||||
#include <Common/FileCache.h>
|
||||
#include <Interpreters/Cache/FileCache.h>
|
||||
#include <Common/logger_useful.h>
|
||||
#include <IO/SeekableReadBuffer.h>
|
||||
#include <IO/WriteBufferFromFile.h>
|
||||
#include <IO/ReadSettings.h>
|
||||
#include <IO/ReadBufferFromFileBase.h>
|
||||
#include <Interpreters/FilesystemCacheLog.h>
|
||||
#include <Common/FileSegment.h>
|
||||
#include <Interpreters/Cache/FileSegment.h>
|
||||
|
||||
|
||||
namespace CurrentMetrics
|
||||
|
@ -1,7 +1,7 @@
|
||||
#include "CachedOnDiskWriteBufferFromFile.h"
|
||||
|
||||
#include <Common/FileCacheFactory.h>
|
||||
#include <Common/FileSegment.h>
|
||||
#include <Interpreters/Cache/FileCacheFactory.h>
|
||||
#include <Interpreters/Cache/FileSegment.h>
|
||||
#include <Common/logger_useful.h>
|
||||
#include <Interpreters/FilesystemCacheLog.h>
|
||||
#include <Interpreters/Context.h>
|
||||
|
@ -2,7 +2,7 @@
|
||||
|
||||
#include <IO/WriteBufferFromFileDecorator.h>
|
||||
#include <IO/WriteSettings.h>
|
||||
#include <Common/FileCache.h>
|
||||
#include <Interpreters/Cache/FileCache.h>
|
||||
#include <Interpreters/FilesystemCacheLog.h>
|
||||
|
||||
namespace Poco
|
||||
|
@ -4,8 +4,8 @@
|
||||
#include <IO/BoundedReadBuffer.h>
|
||||
#include <Disks/IO/CachedOnDiskWriteBufferFromFile.h>
|
||||
#include <Disks/IO/CachedOnDiskReadBufferFromFile.h>
|
||||
#include <Common/FileCache.h>
|
||||
#include <Common/FileCacheFactory.h>
|
||||
#include <Interpreters/Cache/FileCache.h>
|
||||
#include <Interpreters/Cache/FileCacheFactory.h>
|
||||
#include <Common/CurrentThread.h>
|
||||
#include <Common/logger_useful.h>
|
||||
#include <filesystem>
|
||||
|
@ -1,8 +1,8 @@
|
||||
#pragma once
|
||||
|
||||
#include <Disks/ObjectStorages/IObjectStorage.h>
|
||||
#include <Common/FileCache.h>
|
||||
#include <Common/FileCacheSettings.h>
|
||||
#include <Interpreters/Cache/FileCache.h>
|
||||
#include <Interpreters/Cache/FileCacheSettings.h>
|
||||
|
||||
namespace Poco
|
||||
{
|
||||
|
@ -1,6 +1,6 @@
|
||||
#include <Common/FileCacheSettings.h>
|
||||
#include <Common/FileCacheFactory.h>
|
||||
#include <Common/FileCache.h>
|
||||
#include <Interpreters/Cache/FileCacheSettings.h>
|
||||
#include <Interpreters/Cache/FileCacheFactory.h>
|
||||
#include <Interpreters/Cache/FileCache.h>
|
||||
#include <Common/logger_useful.h>
|
||||
#include <Common/assert_cast.h>
|
||||
#include <Disks/DiskFactory.h>
|
||||
|
@ -10,7 +10,6 @@
|
||||
#include <Common/quoteString.h>
|
||||
#include <Common/logger_useful.h>
|
||||
#include <Common/filesystemHelpers.h>
|
||||
#include <Common/FileCache.h>
|
||||
#include <Disks/ObjectStorages/Cached/CachedObjectStorage.h>
|
||||
#include <Disks/ObjectStorages/DiskObjectStorageRemoteMetadataRestoreHelper.h>
|
||||
#include <Disks/ObjectStorages/DiskObjectStorageTransaction.h>
|
||||
|
@ -2,7 +2,6 @@
|
||||
|
||||
#include <Disks/IDisk.h>
|
||||
#include <Disks/ObjectStorages/IObjectStorage.h>
|
||||
#include <Common/FileCache_fwd.h>
|
||||
#include <Disks/ObjectStorages/DiskObjectStorageRemoteMetadataRestoreHelper.h>
|
||||
#include <Disks/ObjectStorages/IMetadataStorage.h>
|
||||
#include <Disks/ObjectStorages/DiskObjectStorageTransaction.h>
|
||||
|
@ -17,7 +17,6 @@
|
||||
#include <Disks/ObjectStorages/StoredObject.h>
|
||||
#include <Disks/DiskType.h>
|
||||
#include <Common/ThreadPool.h>
|
||||
#include <Common/FileCache.h>
|
||||
#include <Disks/WriteMode.h>
|
||||
|
||||
|
||||
|
@ -1,8 +1,6 @@
|
||||
#include <Disks/ObjectStorages/LocalObjectStorage.h>
|
||||
|
||||
#include <Disks/ObjectStorages/DiskObjectStorageCommon.h>
|
||||
#include <Common/FileCache.h>
|
||||
#include <Common/FileCacheFactory.h>
|
||||
#include <Common/filesystemHelpers.h>
|
||||
#include <Common/logger_useful.h>
|
||||
#include <Disks/IO/createReadBufferFromFileBase.h>
|
||||
|
@ -22,8 +22,6 @@
|
||||
#include <Disks/DiskRestartProxy.h>
|
||||
#include <Disks/DiskLocal.h>
|
||||
|
||||
#include <Common/FileCacheFactory.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
|
@ -6,7 +6,6 @@
|
||||
|
||||
#include <aws/core/client/DefaultRetryStrategy.h>
|
||||
#include <IO/S3Common.h>
|
||||
#include <Disks/DiskCacheWrapper.h>
|
||||
#include <Storages/StorageS3Settings.h>
|
||||
#include <Disks/ObjectStorages/S3/ProxyConfiguration.h>
|
||||
#include <Disks/ObjectStorages/S3/ProxyListConfiguration.h>
|
||||
@ -14,7 +13,6 @@
|
||||
#include <Disks/DiskRestartProxy.h>
|
||||
#include <Disks/DiskLocal.h>
|
||||
#include <Disks/ObjectStorages/DiskObjectStorageCommon.h>
|
||||
#include <Common/FileCacheFactory.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
|
@ -272,6 +272,19 @@ private:
|
||||
writeNumber2(target + 6, ToSecondImpl::execute(source, timezone));
|
||||
}
|
||||
|
||||
static void timezoneOffset(char * target, Time source, const DateLUTImpl & timezone)
|
||||
{
|
||||
auto offset = TimezoneOffsetImpl::execute(source, timezone);
|
||||
if (offset < 0)
|
||||
{
|
||||
*target = '-';
|
||||
offset = -offset;
|
||||
}
|
||||
|
||||
writeNumber2(target + 1, offset / 3600);
|
||||
writeNumber2(target + 3, offset % 3600 / 60);
|
||||
}
|
||||
|
||||
static void quarter(char * target, Time source, const DateLUTImpl & timezone)
|
||||
{
|
||||
*target += ToQuarterImpl::execute(source, timezone);
|
||||
@ -632,6 +645,12 @@ public:
|
||||
result.append("0");
|
||||
break;
|
||||
|
||||
// Offset from UTC timezone as +hhmm or -hhmm
|
||||
case 'z':
|
||||
instructions.emplace_back(&Action<T>::timezoneOffset, 5);
|
||||
result.append("+0000");
|
||||
break;
|
||||
|
||||
/// Time components. If the argument is Date, not a DateTime, then this components will have default value.
|
||||
|
||||
// Minute (00-59)
|
||||
|
@ -636,8 +636,9 @@ concept WithResize = requires (T value)
|
||||
template <typename Vector>
|
||||
void readCSVStringInto(Vector & s, ReadBuffer & buf, const FormatSettings::CSV & settings)
|
||||
{
|
||||
/// Empty string
|
||||
if (buf.eof())
|
||||
throwReadAfterEOF();
|
||||
return;
|
||||
|
||||
const char delimiter = settings.delimiter;
|
||||
const char maybe_quote = *buf.position();
|
||||
|
@ -3,7 +3,7 @@
|
||||
#include <cstddef>
|
||||
#include <string>
|
||||
#include <Core/Defines.h>
|
||||
#include <Common/FileCache_fwd.h>
|
||||
#include <Interpreters/Cache/FileCache_fwd.h>
|
||||
#include <Common/Throttler_fwd.h>
|
||||
|
||||
namespace DB
|
||||
|
@ -4,7 +4,7 @@
|
||||
|
||||
#include <Common/logger_useful.h>
|
||||
#include <Common/Throttler.h>
|
||||
#include <Common/FileCache.h>
|
||||
#include <Interpreters/Cache/FileCache.h>
|
||||
|
||||
#include <IO/WriteBufferFromS3.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
|
@ -11,10 +11,10 @@
|
||||
#include <Common/CurrentMetrics.h>
|
||||
#include <Common/typeid_cast.h>
|
||||
#include <Common/filesystemHelpers.h>
|
||||
#include <Common/FileCacheFactory.h>
|
||||
#include <Interpreters/Cache/FileCacheFactory.h>
|
||||
#include <Common/getCurrentProcessFDCount.h>
|
||||
#include <Common/getMaxFileDescriptorCount.h>
|
||||
#include <Common/FileCache.h>
|
||||
#include <Interpreters/Cache/FileCache.h>
|
||||
#include <Server/ProtocolServerAdapter.h>
|
||||
#include <Storages/MarkCache.h>
|
||||
#include <Storages/StorageMergeTree.h>
|
||||
@ -77,9 +77,11 @@ static std::unique_ptr<ReadBufferFromFilePRead> openFileIfExists(const std::stri
|
||||
AsynchronousMetrics::AsynchronousMetrics(
|
||||
ContextPtr global_context_,
|
||||
int update_period_seconds,
|
||||
int heavy_metrics_update_period_seconds,
|
||||
const ProtocolServerMetricsFunc & protocol_server_metrics_func_)
|
||||
: WithContext(global_context_)
|
||||
, update_period(update_period_seconds)
|
||||
, heavy_metric_update_period(heavy_metrics_update_period_seconds)
|
||||
, protocol_server_metrics_func(protocol_server_metrics_func_)
|
||||
, log(&Poco::Logger::get("AsynchronousMetrics"))
|
||||
{
|
||||
@ -563,7 +565,7 @@ AsynchronousMetrics::NetworkInterfaceStatValues::operator-(const AsynchronousMet
|
||||
#endif
|
||||
|
||||
|
||||
void AsynchronousMetrics::update(std::chrono::system_clock::time_point update_time)
|
||||
void AsynchronousMetrics::update(TimePoint update_time)
|
||||
{
|
||||
Stopwatch watch;
|
||||
|
||||
@ -1584,6 +1586,8 @@ void AsynchronousMetrics::update(std::chrono::system_clock::time_point update_ti
|
||||
saveAllArenasMetric<size_t>(new_values, "muzzy_purged");
|
||||
#endif
|
||||
|
||||
updateHeavyMetricsIfNeeded(current_time, update_time, new_values);
|
||||
|
||||
/// Add more metrics as you wish.
|
||||
|
||||
new_values["AsynchronousMetricsCalculationTimeSpent"] = watch.elapsedSeconds();
|
||||
@ -1601,4 +1605,76 @@ void AsynchronousMetrics::update(std::chrono::system_clock::time_point update_ti
|
||||
values = new_values;
|
||||
}
|
||||
|
||||
void AsynchronousMetrics::updateDetachedPartsStats()
|
||||
{
|
||||
DetachedPartsStats current_values{};
|
||||
|
||||
for (const auto & db : DatabaseCatalog::instance().getDatabases())
|
||||
{
|
||||
if (!db.second->canContainMergeTreeTables())
|
||||
continue;
|
||||
|
||||
for (auto iterator = db.second->getTablesIterator(getContext()); iterator->isValid(); iterator->next())
|
||||
{
|
||||
const auto & table = iterator->table();
|
||||
if (!table)
|
||||
continue;
|
||||
|
||||
if (MergeTreeData * table_merge_tree = dynamic_cast<MergeTreeData *>(table.get()))
|
||||
{
|
||||
for (const auto & detached_part: table_merge_tree->getDetachedParts())
|
||||
{
|
||||
if (!detached_part.valid_name)
|
||||
continue;
|
||||
|
||||
if (detached_part.prefix.empty())
|
||||
++current_values.detached_by_user;
|
||||
|
||||
++current_values.count;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
detached_parts_stats = current_values;
|
||||
}
|
||||
|
||||
void AsynchronousMetrics::updateHeavyMetricsIfNeeded(TimePoint current_time, TimePoint update_time, AsynchronousMetricValues & new_values)
|
||||
{
|
||||
const auto time_after_previous_update = current_time - heavy_metric_previous_update_time;
|
||||
const bool update_heavy_metric = time_after_previous_update >= heavy_metric_update_period || first_run;
|
||||
|
||||
if (update_heavy_metric)
|
||||
{
|
||||
heavy_metric_previous_update_time = update_time;
|
||||
|
||||
Stopwatch watch;
|
||||
|
||||
/// Test shows that listing 100000 entries consuming around 0.15 sec.
|
||||
updateDetachedPartsStats();
|
||||
|
||||
watch.stop();
|
||||
|
||||
/// Normally heavy metrics don't delay the rest of the metrics calculation
|
||||
/// otherwise log the warning message
|
||||
auto log_level = std::make_pair(DB::LogsLevel::trace, Poco::Message::PRIO_TRACE);
|
||||
if (watch.elapsedSeconds() > (update_period.count() / 2.))
|
||||
log_level = std::make_pair(DB::LogsLevel::debug, Poco::Message::PRIO_DEBUG);
|
||||
else if (watch.elapsedSeconds() > (update_period.count() / 4. * 3))
|
||||
log_level = std::make_pair(DB::LogsLevel::warning, Poco::Message::PRIO_WARNING);
|
||||
LOG_IMPL(log, log_level.first, log_level.second,
|
||||
"Update heavy metrics. "
|
||||
"Update period {} sec. "
|
||||
"Update heavy metrics period {} sec. "
|
||||
"Heavy metrics calculation elapsed: {} sec.",
|
||||
update_period.count(),
|
||||
heavy_metric_update_period.count(),
|
||||
watch.elapsedSeconds());
|
||||
|
||||
}
|
||||
|
||||
new_values["NumberOfDetachedParts"] = detached_parts_stats.count;
|
||||
new_values["NumberOfDetachedByUserParts"] = detached_parts_stats.detached_by_user;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -50,6 +50,7 @@ public:
|
||||
AsynchronousMetrics(
|
||||
ContextPtr global_context_,
|
||||
int update_period_seconds,
|
||||
int heavy_metrics_update_period_seconds,
|
||||
const ProtocolServerMetricsFunc & protocol_server_metrics_func_);
|
||||
|
||||
~AsynchronousMetrics();
|
||||
@ -63,7 +64,11 @@ public:
|
||||
AsynchronousMetricValues getValues() const;
|
||||
|
||||
private:
|
||||
const std::chrono::seconds update_period;
|
||||
using Duration = std::chrono::seconds;
|
||||
using TimePoint = std::chrono::system_clock::time_point;
|
||||
|
||||
const Duration update_period;
|
||||
const Duration heavy_metric_update_period;
|
||||
ProtocolServerMetricsFunc protocol_server_metrics_func;
|
||||
|
||||
mutable std::mutex mutex;
|
||||
@ -74,7 +79,16 @@ private:
|
||||
/// Some values are incremental and we have to calculate the difference.
|
||||
/// On first run we will only collect the values to subtract later.
|
||||
bool first_run = true;
|
||||
std::chrono::system_clock::time_point previous_update_time;
|
||||
TimePoint previous_update_time;
|
||||
TimePoint heavy_metric_previous_update_time;
|
||||
|
||||
struct DetachedPartsStats
|
||||
{
|
||||
size_t count;
|
||||
size_t detached_by_user;
|
||||
};
|
||||
|
||||
DetachedPartsStats detached_parts_stats{};
|
||||
|
||||
#if defined(OS_LINUX) || defined(OS_FREEBSD)
|
||||
MemoryStatisticsOS memory_stat;
|
||||
@ -185,7 +199,10 @@ private:
|
||||
std::unique_ptr<ThreadFromGlobalPool> thread;
|
||||
|
||||
void run();
|
||||
void update(std::chrono::system_clock::time_point update_time);
|
||||
void update(TimePoint update_time);
|
||||
|
||||
void updateDetachedPartsStats();
|
||||
void updateHeavyMetricsIfNeeded(TimePoint current_time, TimePoint update_time, AsynchronousMetricValues & new_values);
|
||||
|
||||
Poco::Logger * log;
|
||||
};
|
||||
|
@ -2,7 +2,8 @@
|
||||
|
||||
#include <Common/randomSeed.h>
|
||||
#include <Common/SipHash.h>
|
||||
#include <Common/FileCacheSettings.h>
|
||||
#include <Interpreters/Cache/FileCacheSettings.h>
|
||||
#include <Interpreters/Cache/LRUFileCachePriority.h>
|
||||
#include <IO/ReadHelpers.h>
|
||||
#include <IO/WriteBufferFromFile.h>
|
||||
#include <IO/ReadSettings.h>
|
||||
@ -10,7 +11,6 @@
|
||||
#include <IO/Operators.h>
|
||||
#include <pcg-random/pcg_random.hpp>
|
||||
#include <filesystem>
|
||||
#include <Common/LRUFileCachePriority.h>
|
||||
|
||||
namespace fs = std::filesystem;
|
||||
|
@ -13,14 +13,13 @@
|
||||
|
||||
#include <Core/Types.h>
|
||||
#include <IO/ReadSettings.h>
|
||||
#include <Common/FileCache_fwd.h>
|
||||
#include <Common/FileSegment.h>
|
||||
#include <Common/IFileCachePriority.h>
|
||||
#include <Interpreters/Cache/FileCache_fwd.h>
|
||||
#include <Interpreters/Cache/FileSegment.h>
|
||||
#include <Interpreters/Cache/IFileCachePriority.h>
|
||||
#include <Common/logger_useful.h>
|
||||
#include <Common/FileCacheType.h>
|
||||
#include <Interpreters/Cache/FileCacheKey.h>
|
||||
#include <Common/StatusFile.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
@ -1,7 +1,7 @@
|
||||
#pragma once
|
||||
|
||||
#include <Common/FileCache_fwd.h>
|
||||
#include <Common/FileCacheSettings.h>
|
||||
#include <Interpreters/Cache/FileCache_fwd.h>
|
||||
#include <Interpreters/Cache/FileCacheSettings.h>
|
||||
|
||||
#include <boost/noncopyable.hpp>
|
||||
#include <unordered_map>
|
@ -1,6 +1,6 @@
|
||||
#pragma once
|
||||
|
||||
#include <Common/FileCache_fwd.h>
|
||||
#include <Interpreters/Cache/FileCache_fwd.h>
|
||||
|
||||
namespace Poco { namespace Util { class AbstractConfiguration; } } // NOLINT(cppcoreguidelines-virtual-class-destructor)
|
||||
|
@ -3,7 +3,7 @@
|
||||
#include <base/getThreadId.h>
|
||||
#include <base/scope_guard.h>
|
||||
#include <Common/logger_useful.h>
|
||||
#include <Common/FileCache.h>
|
||||
#include <Interpreters/Cache/FileCache.h>
|
||||
#include <Common/hex.h>
|
||||
#include <IO/WriteBufferFromString.h>
|
||||
#include <IO/Operators.h>
|
@ -5,7 +5,8 @@
|
||||
#include <IO/WriteBufferFromFile.h>
|
||||
#include <IO/ReadBufferFromFileBase.h>
|
||||
#include <list>
|
||||
#include <Common/FileCacheType.h>
|
||||
#include <Interpreters/Cache/FileCacheKey.h>
|
||||
|
||||
|
||||
namespace Poco { class Logger; }
|
||||
|
@ -4,7 +4,7 @@
|
||||
#include <mutex>
|
||||
#include <Core/Types.h>
|
||||
#include <Common/Exception.h>
|
||||
#include <Common/FileCacheType.h>
|
||||
#include <Interpreters/Cache/FileCacheKey.h>
|
||||
|
||||
namespace DB
|
||||
{
|
@ -1,4 +1,4 @@
|
||||
#include <Common/LRUFileCachePriority.h>
|
||||
#include <Interpreters/Cache/LRUFileCachePriority.h>
|
||||
#include <Common/CurrentMetrics.h>
|
||||
|
||||
namespace CurrentMetrics
|
@ -1,7 +1,7 @@
|
||||
#pragma once
|
||||
|
||||
#include <list>
|
||||
#include <Common/IFileCachePriority.h>
|
||||
#include <Interpreters/Cache/IFileCachePriority.h>
|
||||
#include <Common/logger_useful.h>
|
||||
|
||||
namespace DB
|
@ -5,8 +5,8 @@
|
||||
#include <DataTypes/DataTypesNumber.h>
|
||||
#include <DataTypes/DataTypeString.h>
|
||||
#include <Storages/ColumnsDescription.h>
|
||||
#include <Common/FileCacheFactory.h>
|
||||
#include <Common/FileCache.h>
|
||||
#include <Interpreters/Cache/FileCacheFactory.h>
|
||||
#include <Interpreters/Cache/FileCache.h>
|
||||
#include <Access/Common/AccessFlags.h>
|
||||
#include <Core/Block.h>
|
||||
|
||||
|
@ -7,7 +7,7 @@
|
||||
#include <Interpreters/InterpreterShowTablesQuery.h>
|
||||
#include <DataTypes/DataTypeString.h>
|
||||
#include <Storages/ColumnsDescription.h>
|
||||
#include <Common/FileCacheFactory.h>
|
||||
#include <Interpreters/Cache/FileCacheFactory.h>
|
||||
#include <Processors/Sources/SourceFromSingleChunk.h>
|
||||
#include <Access/Common/AccessFlags.h>
|
||||
#include <Common/typeid_cast.h>
|
||||
|
@ -7,8 +7,8 @@
|
||||
#include <Common/ThreadPool.h>
|
||||
#include <Common/escapeForFileName.h>
|
||||
#include <Common/ShellCommand.h>
|
||||
#include <Common/FileCacheFactory.h>
|
||||
#include <Common/FileCache.h>
|
||||
#include <Interpreters/Cache/FileCacheFactory.h>
|
||||
#include <Interpreters/Cache/FileCache.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Interpreters/DatabaseCatalog.h>
|
||||
#include <Interpreters/ExternalDictionariesLoader.h>
|
||||
|
@ -1,11 +1,11 @@
|
||||
#include <iomanip>
|
||||
#include <iostream>
|
||||
#include <gtest/gtest.h>
|
||||
#include <Common/FileCache.h>
|
||||
#include <Common/FileSegment.h>
|
||||
#include <Interpreters/Cache/FileCache.h>
|
||||
#include <Interpreters/Cache/FileSegment.h>
|
||||
#include <Interpreters/Cache/FileCacheSettings.h>
|
||||
#include <Common/CurrentThread.h>
|
||||
#include <Common/filesystemHelpers.h>
|
||||
#include <Common/FileCacheSettings.h>
|
||||
#include <Common/tests/gtest_global_context.h>
|
||||
#include <Common/SipHash.h>
|
||||
#include <Common/hex.h>
|
@ -99,8 +99,8 @@ void Loggers::buildLoggers(Poco::Util::AbstractConfiguration & config, Poco::Log
|
||||
|
||||
Poco::AutoPtr<OwnPatternFormatter> pf;
|
||||
|
||||
if (config.getString("logger.formatting", "") == "json")
|
||||
pf = new OwnJSONPatternFormatter;
|
||||
if (config.getString("logger.formatting.type", "") == "json")
|
||||
pf = new OwnJSONPatternFormatter(config);
|
||||
else
|
||||
pf = new OwnPatternFormatter;
|
||||
|
||||
@ -140,8 +140,8 @@ void Loggers::buildLoggers(Poco::Util::AbstractConfiguration & config, Poco::Log
|
||||
|
||||
Poco::AutoPtr<OwnPatternFormatter> pf;
|
||||
|
||||
if (config.getString("logger.formatting", "") == "json")
|
||||
pf = new OwnJSONPatternFormatter;
|
||||
if (config.getString("logger.formatting.type", "") == "json")
|
||||
pf = new OwnJSONPatternFormatter(config);
|
||||
else
|
||||
pf = new OwnPatternFormatter;
|
||||
|
||||
@ -184,8 +184,8 @@ void Loggers::buildLoggers(Poco::Util::AbstractConfiguration & config, Poco::Log
|
||||
|
||||
Poco::AutoPtr<OwnPatternFormatter> pf;
|
||||
|
||||
if (config.getString("logger.formatting", "") == "json")
|
||||
pf = new OwnJSONPatternFormatter;
|
||||
if (config.getString("logger.formatting.type", "") == "json")
|
||||
pf = new OwnJSONPatternFormatter(config);
|
||||
else
|
||||
pf = new OwnPatternFormatter;
|
||||
|
||||
@ -211,8 +211,8 @@ void Loggers::buildLoggers(Poco::Util::AbstractConfiguration & config, Poco::Log
|
||||
}
|
||||
|
||||
Poco::AutoPtr<OwnPatternFormatter> pf;
|
||||
if (config.getString("logger.formatting", "") == "json")
|
||||
pf = new OwnJSONPatternFormatter;
|
||||
if (config.getString("logger.formatting.type", "") == "json")
|
||||
pf = new OwnJSONPatternFormatter(config);
|
||||
else
|
||||
pf = new OwnPatternFormatter(color_enabled);
|
||||
Poco::AutoPtr<DB::OwnFormattingChannel> log = new DB::OwnFormattingChannel(pf, new Poco::ConsoleChannel);
|
||||
|
@ -8,93 +8,186 @@
|
||||
#include <Common/CurrentThread.h>
|
||||
#include <Common/HashTable/Hash.h>
|
||||
|
||||
OwnJSONPatternFormatter::OwnJSONPatternFormatter() : OwnPatternFormatter("")
|
||||
OwnJSONPatternFormatter::OwnJSONPatternFormatter(Poco::Util::AbstractConfiguration & config)
|
||||
{
|
||||
}
|
||||
if (config.has("logger.formatting.names.date_time"))
|
||||
date_time = config.getString("logger.formatting.names.date_time", "");
|
||||
|
||||
if (config.has("logger.formatting.names.thread_name"))
|
||||
thread_name = config.getString("logger.formatting.names.thread_name", "");
|
||||
|
||||
if (config.has("logger.formatting.names.thread_id"))
|
||||
thread_id = config.getString("logger.formatting.names.thread_id", "");
|
||||
|
||||
if (config.has("logger.formatting.names.level"))
|
||||
level = config.getString("logger.formatting.names.level", "");
|
||||
|
||||
if (config.has("logger.formatting.names.query_id"))
|
||||
query_id = config.getString("logger.formatting.names.query_id", "");
|
||||
|
||||
if (config.has("logger.formatting.names.logger_name"))
|
||||
logger_name = config.getString("logger.formatting.names.logger_name", "");
|
||||
|
||||
if (config.has("logger.formatting.names.message"))
|
||||
message = config.getString("logger.formatting.names.message", "");
|
||||
|
||||
if (config.has("logger.formatting.names.source_file"))
|
||||
source_file = config.getString("logger.formatting.names.source_file", "");
|
||||
|
||||
if (config.has("logger.formatting.names.source_line"))
|
||||
source_line = config.getString("logger.formatting.names.source_line", "");
|
||||
|
||||
if (date_time.empty() && thread_name.empty() && thread_id.empty() && level.empty() && query_id.empty()
|
||||
&& logger_name.empty() && message.empty() && source_file.empty() && source_line.empty())
|
||||
{
|
||||
date_time = "date_time";
|
||||
thread_name = "thread_name";
|
||||
thread_id = "thread_id";
|
||||
level = "level";
|
||||
query_id = "query_id";
|
||||
logger_name = "logger_name";
|
||||
message = "message";
|
||||
source_file = "source_file";
|
||||
source_line = "source_line";
|
||||
}
|
||||
}
|
||||
|
||||
void OwnJSONPatternFormatter::formatExtended(const DB::ExtendedLogMessage & msg_ext, std::string & text) const
|
||||
{
|
||||
DB::WriteBufferFromString wb(text);
|
||||
|
||||
DB::FormatSettings settings;
|
||||
bool print_comma = false;
|
||||
|
||||
const Poco::Message & msg = msg_ext.base;
|
||||
DB::writeChar('{', wb);
|
||||
|
||||
writeJSONString("date_time", wb, settings);
|
||||
DB::writeChar(':', wb);
|
||||
if (!date_time.empty())
|
||||
{
|
||||
writeJSONString(date_time, wb, settings);
|
||||
DB::writeChar(':', wb);
|
||||
|
||||
DB::writeChar('\"', wb);
|
||||
/// Change delimiters in date for compatibility with old logs.
|
||||
writeDateTimeUnixTimestamp(msg_ext.time_seconds, 0, wb);
|
||||
DB::writeChar('.', wb);
|
||||
DB::writeChar('0' + ((msg_ext.time_microseconds / 100000) % 10), wb);
|
||||
DB::writeChar('0' + ((msg_ext.time_microseconds / 10000) % 10), wb);
|
||||
DB::writeChar('0' + ((msg_ext.time_microseconds / 1000) % 10), wb);
|
||||
DB::writeChar('0' + ((msg_ext.time_microseconds / 100) % 10), wb);
|
||||
DB::writeChar('0' + ((msg_ext.time_microseconds / 10) % 10), wb);
|
||||
DB::writeChar('0' + ((msg_ext.time_microseconds / 1) % 10), wb);
|
||||
DB::writeChar('\"', wb);
|
||||
DB::writeChar('\"', wb);
|
||||
/// Change delimiters in date for compatibility with old logs.
|
||||
writeDateTimeUnixTimestamp(msg_ext.time_seconds, 0, wb);
|
||||
DB::writeChar('.', wb);
|
||||
DB::writeChar('0' + ((msg_ext.time_microseconds / 100000) % 10), wb);
|
||||
DB::writeChar('0' + ((msg_ext.time_microseconds / 10000) % 10), wb);
|
||||
DB::writeChar('0' + ((msg_ext.time_microseconds / 1000) % 10), wb);
|
||||
DB::writeChar('0' + ((msg_ext.time_microseconds / 100) % 10), wb);
|
||||
DB::writeChar('0' + ((msg_ext.time_microseconds / 10) % 10), wb);
|
||||
DB::writeChar('0' + ((msg_ext.time_microseconds / 1) % 10), wb);
|
||||
DB::writeChar('\"', wb);
|
||||
print_comma = true;
|
||||
}
|
||||
|
||||
DB::writeChar(',', wb);
|
||||
if (!thread_name.empty())
|
||||
{
|
||||
if (print_comma)
|
||||
DB::writeChar(',', wb);
|
||||
else
|
||||
print_comma = true;
|
||||
|
||||
writeJSONString("thread_name", wb, settings);
|
||||
DB::writeChar(':', wb);
|
||||
writeJSONString(thread_name, wb, settings);
|
||||
DB::writeChar(':', wb);
|
||||
|
||||
writeJSONString(msg.getThread(), wb, settings);
|
||||
writeJSONString(msg.getThread(), wb, settings);
|
||||
}
|
||||
|
||||
DB::writeChar(',', wb);
|
||||
if (!thread_id.empty())
|
||||
{
|
||||
if (print_comma)
|
||||
DB::writeChar(',', wb);
|
||||
else
|
||||
print_comma = true;
|
||||
|
||||
writeJSONString("thread_id", wb, settings);
|
||||
DB::writeChar(':', wb);
|
||||
DB::writeChar('\"', wb);
|
||||
DB::writeIntText(msg_ext.thread_id, wb);
|
||||
DB::writeChar('\"', wb);
|
||||
writeJSONString(thread_id, wb, settings);
|
||||
DB::writeChar(':', wb);
|
||||
DB::writeChar('\"', wb);
|
||||
DB::writeIntText(msg_ext.thread_id, wb);
|
||||
DB::writeChar('\"', wb);
|
||||
}
|
||||
|
||||
DB::writeChar(',', wb);
|
||||
if (!level.empty())
|
||||
{
|
||||
if (print_comma)
|
||||
DB::writeChar(',', wb);
|
||||
else
|
||||
print_comma = true;
|
||||
|
||||
writeJSONString("level", wb, settings);
|
||||
DB::writeChar(':', wb);
|
||||
int priority = static_cast<int>(msg.getPriority());
|
||||
writeJSONString(std::to_string(priority), wb, settings);
|
||||
DB::writeChar(',', wb);
|
||||
writeJSONString(level, wb, settings);
|
||||
DB::writeChar(':', wb);
|
||||
int priority = static_cast<int>(msg.getPriority());
|
||||
writeJSONString(std::to_string(priority), wb, settings);
|
||||
}
|
||||
|
||||
/// We write query_id even in case when it is empty (no query context)
|
||||
/// just to be convenient for various log parsers.
|
||||
if (!query_id.empty())
|
||||
{
|
||||
if (print_comma)
|
||||
DB::writeChar(',', wb);
|
||||
else
|
||||
print_comma = true;
|
||||
|
||||
writeJSONString("query_id", wb, settings);
|
||||
DB::writeChar(':', wb);
|
||||
writeJSONString(msg_ext.query_id, wb, settings);
|
||||
/// We write query_id even in case when it is empty (no query context)
|
||||
/// just to be convenient for various log parsers.
|
||||
|
||||
DB::writeChar(',', wb);
|
||||
writeJSONString(query_id, wb, settings);
|
||||
DB::writeChar(':', wb);
|
||||
writeJSONString(msg_ext.query_id, wb, settings);
|
||||
}
|
||||
|
||||
writeJSONString("logger_name", wb, settings);
|
||||
DB::writeChar(':', wb);
|
||||
if (!logger_name.empty())
|
||||
{
|
||||
if (print_comma)
|
||||
DB::writeChar(',', wb);
|
||||
else
|
||||
print_comma = true;
|
||||
|
||||
writeJSONString(msg.getSource(), wb, settings);
|
||||
DB::writeChar(',', wb);
|
||||
writeJSONString(logger_name, wb, settings);
|
||||
DB::writeChar(':', wb);
|
||||
|
||||
writeJSONString("message", wb, settings);
|
||||
DB::writeChar(':', wb);
|
||||
writeJSONString(msg.getText(), wb, settings);
|
||||
DB::writeChar(',', wb);
|
||||
writeJSONString(msg.getSource(), wb, settings);
|
||||
}
|
||||
|
||||
if (!message.empty())
|
||||
{
|
||||
if (print_comma)
|
||||
DB::writeChar(',', wb);
|
||||
else
|
||||
print_comma = true;
|
||||
|
||||
writeJSONString(message, wb, settings);
|
||||
DB::writeChar(':', wb);
|
||||
writeJSONString(msg.getText(), wb, settings);
|
||||
}
|
||||
|
||||
if (!source_file.empty())
|
||||
{
|
||||
if (print_comma)
|
||||
DB::writeChar(',', wb);
|
||||
else
|
||||
print_comma = true;
|
||||
|
||||
writeJSONString("source_file", wb, settings);
|
||||
DB::writeChar(':', wb);
|
||||
const char * source_file = msg.getSourceFile();
|
||||
if (source_file != nullptr)
|
||||
writeJSONString(source_file, wb, settings);
|
||||
else
|
||||
writeJSONString("", wb, settings);
|
||||
DB::writeChar(',', wb);
|
||||
DB::writeChar(':', wb);
|
||||
const char * source_file_name = msg.getSourceFile();
|
||||
if (source_file_name != nullptr)
|
||||
writeJSONString(source_file_name, wb, settings);
|
||||
else
|
||||
writeJSONString("", wb, settings);
|
||||
}
|
||||
|
||||
writeJSONString("source_line", wb, settings);
|
||||
DB::writeChar(':', wb);
|
||||
DB::writeChar('\"', wb);
|
||||
DB::writeIntText(msg.getSourceLine(), wb);
|
||||
DB::writeChar('\"', wb);
|
||||
if (!source_line.empty())
|
||||
{
|
||||
if (print_comma)
|
||||
DB::writeChar(',', wb);
|
||||
|
||||
writeJSONString(source_line, wb, settings);
|
||||
DB::writeChar(':', wb);
|
||||
DB::writeChar('\"', wb);
|
||||
DB::writeIntText(msg.getSourceLine(), wb);
|
||||
DB::writeChar('\"', wb);
|
||||
}
|
||||
DB::writeChar('}', wb);
|
||||
}
|
||||
|
||||
|
@ -2,6 +2,7 @@
|
||||
|
||||
|
||||
#include <Poco/PatternFormatter.h>
|
||||
#include <Poco/Util/AbstractConfiguration.h>
|
||||
#include "ExtendedLogChannel.h"
|
||||
#include "OwnPatternFormatter.h"
|
||||
|
||||
@ -25,8 +26,19 @@ class Loggers;
|
||||
class OwnJSONPatternFormatter : public OwnPatternFormatter
|
||||
{
|
||||
public:
|
||||
OwnJSONPatternFormatter();
|
||||
OwnJSONPatternFormatter(Poco::Util::AbstractConfiguration & config);
|
||||
|
||||
void format(const Poco::Message & msg, std::string & text) override;
|
||||
void formatExtended(const DB::ExtendedLogMessage & msg_ext, std::string & text) const override;
|
||||
|
||||
private:
|
||||
std::string date_time;
|
||||
std::string thread_name;
|
||||
std::string thread_id;
|
||||
std::string level;
|
||||
std::string query_id;
|
||||
std::string logger_name;
|
||||
std::string message;
|
||||
std::string source_file;
|
||||
std::string source_line;
|
||||
};
|
||||
|
@ -865,6 +865,7 @@ MergeTreeDataSelectAnalysisResultPtr ReadFromMergeTree::selectRangesToRead(
|
||||
|
||||
size_t total_parts = parts.size();
|
||||
|
||||
/// TODO Support row_policy_filter and additional_filters
|
||||
auto part_values = MergeTreeDataSelectExecutor::filterPartsByVirtualColumns(data, parts, query_info.query, context);
|
||||
if (part_values && part_values->empty())
|
||||
return std::make_shared<MergeTreeDataSelectAnalysisResult>(MergeTreeDataSelectAnalysisResult{.result = std::move(result)});
|
||||
@ -923,6 +924,9 @@ MergeTreeDataSelectAnalysisResultPtr ReadFromMergeTree::selectRangesToRead(
|
||||
}
|
||||
LOG_DEBUG(log, "Key condition: {}", key_condition->toString());
|
||||
|
||||
if (key_condition->alwaysFalse())
|
||||
return std::make_shared<MergeTreeDataSelectAnalysisResult>(MergeTreeDataSelectAnalysisResult{.result = std::move(result)});
|
||||
|
||||
const auto & select = query_info.query->as<ASTSelectQuery &>();
|
||||
|
||||
size_t total_marks_pk = 0;
|
||||
|
@ -2463,7 +2463,6 @@ BoolMask KeyCondition::checkInHyperrectangle(
|
||||
return rpn_stack[0];
|
||||
}
|
||||
|
||||
|
||||
bool KeyCondition::mayBeTrueInRange(
|
||||
size_t used_key_size,
|
||||
const FieldRef * left_keys,
|
||||
@ -2474,6 +2473,7 @@ bool KeyCondition::mayBeTrueInRange(
|
||||
}
|
||||
|
||||
String KeyCondition::RPNElement::toString() const { return toString("column " + std::to_string(key_column), false); }
|
||||
|
||||
String KeyCondition::RPNElement::toString(std::string_view column_name, bool print_constants) const
|
||||
{
|
||||
auto print_wrapped_column = [this, &column_name, print_constants](WriteBuffer & buf)
|
||||
@ -2563,10 +2563,12 @@ bool KeyCondition::alwaysUnknownOrTrue() const
|
||||
{
|
||||
return unknownOrAlwaysTrue(false);
|
||||
}
|
||||
|
||||
bool KeyCondition::anyUnknownOrAlwaysTrue() const
|
||||
{
|
||||
return unknownOrAlwaysTrue(true);
|
||||
}
|
||||
|
||||
bool KeyCondition::unknownOrAlwaysTrue(bool unknown_any) const
|
||||
{
|
||||
std::vector<UInt8> rpn_stack;
|
||||
@ -2627,6 +2629,80 @@ bool KeyCondition::unknownOrAlwaysTrue(bool unknown_any) const
|
||||
return rpn_stack[0];
|
||||
}
|
||||
|
||||
bool KeyCondition::alwaysFalse() const
|
||||
{
|
||||
/// 0: always_false, 1: always_true, 2: non_const
|
||||
std::vector<UInt8> rpn_stack;
|
||||
|
||||
for (const auto & element : rpn)
|
||||
{
|
||||
if (element.function == RPNElement::ALWAYS_TRUE)
|
||||
{
|
||||
rpn_stack.push_back(1);
|
||||
}
|
||||
else if (element.function == RPNElement::ALWAYS_FALSE)
|
||||
{
|
||||
rpn_stack.push_back(0);
|
||||
}
|
||||
else if (element.function == RPNElement::FUNCTION_NOT_IN_RANGE
|
||||
|| element.function == RPNElement::FUNCTION_IN_RANGE
|
||||
|| element.function == RPNElement::FUNCTION_IN_SET
|
||||
|| element.function == RPNElement::FUNCTION_NOT_IN_SET
|
||||
|| element.function == RPNElement::FUNCTION_IS_NULL
|
||||
|| element.function == RPNElement::FUNCTION_IS_NOT_NULL
|
||||
|| element.function == RPNElement::FUNCTION_UNKNOWN)
|
||||
{
|
||||
rpn_stack.push_back(2);
|
||||
}
|
||||
else if (element.function == RPNElement::FUNCTION_NOT)
|
||||
{
|
||||
assert(!rpn_stack.empty());
|
||||
|
||||
auto & arg = rpn_stack.back();
|
||||
if (arg == 0)
|
||||
arg = 1;
|
||||
else if (arg == 1)
|
||||
arg = 0;
|
||||
}
|
||||
else if (element.function == RPNElement::FUNCTION_AND)
|
||||
{
|
||||
assert(!rpn_stack.empty());
|
||||
|
||||
auto arg1 = rpn_stack.back();
|
||||
rpn_stack.pop_back();
|
||||
auto arg2 = rpn_stack.back();
|
||||
|
||||
if (arg1 == 0 || arg2 == 0)
|
||||
rpn_stack.back() = 0;
|
||||
else if (arg1 == 1 && arg2 == 1)
|
||||
rpn_stack.back() = 1;
|
||||
else
|
||||
rpn_stack.back() = 2;
|
||||
}
|
||||
else if (element.function == RPNElement::FUNCTION_OR)
|
||||
{
|
||||
assert(!rpn_stack.empty());
|
||||
|
||||
auto arg1 = rpn_stack.back();
|
||||
rpn_stack.pop_back();
|
||||
auto arg2 = rpn_stack.back();
|
||||
|
||||
if (arg1 == 1 || arg2 == 1)
|
||||
rpn_stack.back() = 1;
|
||||
else if (arg1 == 0 && arg2 == 0)
|
||||
rpn_stack.back() = 0;
|
||||
else
|
||||
rpn_stack.back() = 2;
|
||||
}
|
||||
else
|
||||
throw Exception("Unexpected function type in KeyCondition::RPNElement", ErrorCodes::LOGICAL_ERROR);
|
||||
}
|
||||
|
||||
if (rpn_stack.size() != 1)
|
||||
throw Exception("Unexpected stack size in KeyCondition::alwaysFalse", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
return rpn_stack[0] == 0;
|
||||
}
|
||||
|
||||
size_t KeyCondition::getMaxKeyColumn() const
|
||||
{
|
||||
|
@ -279,6 +279,8 @@ public:
|
||||
/// Does not allow any FUNCTION_UNKNOWN (will instantly return true).
|
||||
bool anyUnknownOrAlwaysTrue() const;
|
||||
|
||||
bool alwaysFalse() const;
|
||||
|
||||
/// Get the maximum number of the key element used in the condition.
|
||||
size_t getMaxKeyColumn() const;
|
||||
|
||||
|
@ -179,12 +179,14 @@ void StorageView::replaceWithSubquery(ASTSelectQuery & outer_query, ASTPtr view_
|
||||
|
||||
if (!table_expression->database_and_table_name)
|
||||
{
|
||||
// If it's a view table function, add a fake db.table name.
|
||||
// If it's a view or merge table function, add a fake db.table name.
|
||||
if (table_expression->table_function)
|
||||
{
|
||||
auto table_function_name = table_expression->table_function->as<ASTFunction>()->name;
|
||||
if ((table_function_name == "view") || (table_function_name == "viewIfPermitted"))
|
||||
if (table_function_name == "view" || table_function_name == "viewIfPermitted")
|
||||
table_expression->database_and_table_name = std::make_shared<ASTTableIdentifier>("__view");
|
||||
if (table_function_name == "merge")
|
||||
table_expression->database_and_table_name = std::make_shared<ASTTableIdentifier>("__merge");
|
||||
}
|
||||
if (!table_expression->database_and_table_name)
|
||||
throw Exception("Logical error: incorrect table expression", ErrorCodes::LOGICAL_ERROR);
|
||||
|
@ -2,9 +2,9 @@
|
||||
#include <DataTypes/DataTypeString.h>
|
||||
#include <DataTypes/DataTypesNumber.h>
|
||||
#include <DataTypes/DataTypeTuple.h>
|
||||
#include <Common/FileCache.h>
|
||||
#include <Common/FileSegment.h>
|
||||
#include <Common/FileCacheFactory.h>
|
||||
#include <Interpreters/Cache/FileCache.h>
|
||||
#include <Interpreters/Cache/FileSegment.h>
|
||||
#include <Interpreters/Cache/FileCacheFactory.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Disks/IDisk.h>
|
||||
|
||||
|
@ -1,8 +1,8 @@
|
||||
#include "StorageSystemRemoteDataPaths.h"
|
||||
#include <DataTypes/DataTypeString.h>
|
||||
#include <DataTypes/DataTypeArray.h>
|
||||
#include <Common/FileCache.h>
|
||||
#include <Common/FileCacheFactory.h>
|
||||
#include <Interpreters/Cache/FileCache.h>
|
||||
#include <Interpreters/Cache/FileCacheFactory.h>
|
||||
#include <Columns/ColumnString.h>
|
||||
#include <Columns/ColumnArray.h>
|
||||
#include <Interpreters/Context.h>
|
||||
|
@ -0,0 +1,4 @@
|
||||
<clickhouse>
|
||||
<asynchronous_metrics_update_period_s>1</asynchronous_metrics_update_period_s>
|
||||
<asynchronous_heavy_metrics_update_period_s>1</asynchronous_heavy_metrics_update_period_s>
|
||||
</clickhouse>
|
133
tests/integration/test_detached_parts_metrics/test.py
Normal file
133
tests/integration/test_detached_parts_metrics/test.py
Normal file
@ -0,0 +1,133 @@
|
||||
import time
|
||||
import pytest
|
||||
from helpers.cluster import ClickHouseCluster
|
||||
from helpers.test_tools import assert_eq_with_retry
|
||||
|
||||
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
node1 = cluster.add_instance(
|
||||
"node1",
|
||||
main_configs=["configs/asynchronous_metrics_update_period_s.xml"],
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def started_cluster():
|
||||
try:
|
||||
cluster.start()
|
||||
yield cluster
|
||||
finally:
|
||||
cluster.shutdown()
|
||||
|
||||
|
||||
def test_event_time_microseconds_field(started_cluster):
|
||||
cluster.start()
|
||||
query_create = """
|
||||
CREATE TABLE t
|
||||
(
|
||||
id Int64,
|
||||
event_time Date
|
||||
)
|
||||
Engine=MergeTree()
|
||||
PARTITION BY toYYYYMMDD(event_time)
|
||||
ORDER BY id;
|
||||
"""
|
||||
node1.query(query_create)
|
||||
|
||||
# gives us 2 partitions with 3 parts in total
|
||||
node1.query("INSERT INTO t VALUES (1, toDate('2022-09-01'));")
|
||||
node1.query("INSERT INTO t VALUES (2, toDate('2022-08-29'));")
|
||||
node1.query("INSERT INTO t VALUES (3, toDate('2022-09-01'));")
|
||||
|
||||
query_number_detached_parts_in_async_metric = """
|
||||
SELECT value
|
||||
FROM system.asynchronous_metrics
|
||||
WHERE metric LIKE 'NumberOfDetachedParts';
|
||||
"""
|
||||
query_number_detached_by_user_parts_in_async_metric = """
|
||||
SELECT value
|
||||
FROM system.asynchronous_metrics
|
||||
WHERE metric LIKE 'NumberOfDetachedByUserParts';
|
||||
"""
|
||||
query_count_active_parts = """
|
||||
SELECT count(*) FROM system.parts WHERE table = 't' AND active
|
||||
"""
|
||||
query_count_detached_parts = """
|
||||
SELECT count(*) FROM system.detached_parts WHERE table = 't'
|
||||
"""
|
||||
|
||||
query_one_partition_name = """
|
||||
SELECT name FROM system.parts WHERE table = 't' AND active AND partition = '20220829'
|
||||
"""
|
||||
partition_name = node1.query(query_one_partition_name).strip()
|
||||
|
||||
assert 0 == int(node1.query(query_count_detached_parts))
|
||||
assert 3 == int(node1.query(query_count_active_parts))
|
||||
assert 0 == int(node1.query(query_number_detached_parts_in_async_metric))
|
||||
assert 0 == int(node1.query(query_number_detached_by_user_parts_in_async_metric))
|
||||
|
||||
# detach some parts and wait until asynchronous metrics notice it
|
||||
node1.query("ALTER TABLE t DETACH PARTITION '20220901';")
|
||||
|
||||
assert 2 == int(node1.query(query_count_detached_parts))
|
||||
assert 1 == int(node1.query(query_count_active_parts))
|
||||
|
||||
assert_eq_with_retry(
|
||||
node1,
|
||||
query_number_detached_parts_in_async_metric,
|
||||
"2\n",
|
||||
)
|
||||
assert 2 == int(node1.query(query_number_detached_by_user_parts_in_async_metric))
|
||||
|
||||
# detach the rest parts and wait until asynchronous metrics notice it
|
||||
node1.query("ALTER TABLE t DETACH PARTITION ALL")
|
||||
|
||||
assert 3 == int(node1.query(query_count_detached_parts))
|
||||
assert 0 == int(node1.query(query_count_active_parts))
|
||||
|
||||
assert_eq_with_retry(
|
||||
node1,
|
||||
query_number_detached_parts_in_async_metric,
|
||||
"3\n",
|
||||
)
|
||||
assert 3 == int(node1.query(query_number_detached_by_user_parts_in_async_metric))
|
||||
|
||||
# inject some data directly and wait until asynchronous metrics notice it
|
||||
node1.exec_in_container(
|
||||
[
|
||||
"bash",
|
||||
"-c",
|
||||
"mkdir /var/lib/clickhouse/data/default/t/detached/unexpected_all_0_0_0",
|
||||
]
|
||||
)
|
||||
|
||||
assert 4 == int(node1.query(query_count_detached_parts))
|
||||
assert 0 == int(node1.query(query_count_active_parts))
|
||||
|
||||
assert_eq_with_retry(
|
||||
node1,
|
||||
query_number_detached_parts_in_async_metric,
|
||||
"4\n",
|
||||
)
|
||||
assert 3 == int(node1.query(query_number_detached_by_user_parts_in_async_metric))
|
||||
|
||||
# drop some data directly and wait asynchronous metrics notice it
|
||||
node1.exec_in_container(
|
||||
[
|
||||
"bash",
|
||||
"-c",
|
||||
"rm -rf /var/lib/clickhouse/data/default/t/detached/{}".format(
|
||||
partition_name
|
||||
),
|
||||
]
|
||||
)
|
||||
|
||||
assert 3 == int(node1.query(query_count_detached_parts))
|
||||
assert 0 == int(node1.query(query_count_active_parts))
|
||||
|
||||
assert_eq_with_retry(
|
||||
node1,
|
||||
query_number_detached_parts_in_async_metric,
|
||||
"3\n",
|
||||
)
|
||||
assert 2 == int(node1.query(query_number_detached_by_user_parts_in_async_metric))
|
@ -0,0 +1,27 @@
|
||||
<?xml version="1.0"?>
|
||||
<clickhouse>
|
||||
<logger>
|
||||
<!-- Structured log formatting:
|
||||
You can specify log format(for now, JSON only). In that case, the console log will be printed
|
||||
in specified format like JSON.
|
||||
For example, as below:
|
||||
{"date_time":"1650918987.180175","thread_name":"#1","thread_id":"254545","level":"Trace","query_id":"","logger_name":"BaseDaemon","message":"Received signal 2","source_file":"../base/daemon/BaseDaemon.cpp; virtual void SignalListener::run()","source_line":"192"}
|
||||
To enable JSON logging support, just uncomment <formatting> tag below.
|
||||
-->
|
||||
<formatting>
|
||||
<type>json</type>
|
||||
<names>
|
||||
<date_time>DATE_TIME</date_time>
|
||||
<thread_name>THREAD_NAME</thread_name>
|
||||
<thread_id>THREAD_ID</thread_id>
|
||||
<level>LEVEL</level>
|
||||
<query_id>QUERY_ID</query_id>
|
||||
<logger_name>LOGGER_NAME</logger_name>
|
||||
<message>MESSAGE</message>
|
||||
<source_file>SOURCE_FILE</source_file>
|
||||
<source_line>SOURCE_LINE</source_line>
|
||||
</names>
|
||||
</formatting>
|
||||
</logger>
|
||||
|
||||
</clickhouse>
|
@ -8,7 +8,20 @@
|
||||
{"date_time":"1650918987.180175","thread_name":"#1","thread_id":"254545","level":"Trace","query_id":"","logger_name":"BaseDaemon","message":"Received signal 2","source_file":"../base/daemon/BaseDaemon.cpp; virtual void SignalListener::run()","source_line":"192"}
|
||||
To enable JSON logging support, just uncomment <formatting> tag below.
|
||||
-->
|
||||
<formatting>json</formatting>
|
||||
<formatting>
|
||||
<type>json</type>
|
||||
<names>
|
||||
<date_time>DATE_TIME</date_time>
|
||||
<thread_name>THREAD_NAME</thread_name>
|
||||
<thread_id>THREAD_ID</thread_id>
|
||||
<level>LEVEL</level>
|
||||
<query_id>QUERY_ID</query_id>
|
||||
<logger_name>LOGGER_NAME</logger_name>
|
||||
<message>MESSAGE</message>
|
||||
<source_file>SOURCE_FILE</source_file>
|
||||
<source_line>SOURCE_LINE</source_line>
|
||||
</names>
|
||||
</formatting>
|
||||
</logger>
|
||||
|
||||
</clickhouse>
|
||||
|
@ -0,0 +1,27 @@
|
||||
<?xml version="1.0"?>
|
||||
<clickhouse>
|
||||
<logger>
|
||||
<!-- Structured log formatting:
|
||||
You can specify log format(for now, JSON only). In that case, the console log will be printed
|
||||
in specified format like JSON.
|
||||
For example, as below:
|
||||
{"date_time":"1650918987.180175","thread_name":"#1","thread_id":"254545","level":"Trace","query_id":"","logger_name":"BaseDaemon","message":"Received signal 2","source_file":"../base/daemon/BaseDaemon.cpp; virtual void SignalListener::run()","source_line":"192"}
|
||||
To enable JSON logging support, just uncomment <formatting> tag below.
|
||||
-->
|
||||
<formatting>
|
||||
<type>json</type>
|
||||
<!--<names>
|
||||
<date_time>DATE_TIME</date_time>
|
||||
<thread_name>THREAD_NAME</thread_name>
|
||||
<thread_id>THREAD_ID</thread_id>
|
||||
<level>LEVEL</level>
|
||||
<query_id>QUERY_ID</query_id>
|
||||
<logger_name>LOGGER_NAME</logger_name>
|
||||
<message>MESSAGE</message>
|
||||
<source_file>SOURCE_FILE</source_file>
|
||||
<source_line>SOURCE_LINE</source_line>
|
||||
</names>-->
|
||||
</formatting>
|
||||
</logger>
|
||||
|
||||
</clickhouse>
|
@ -0,0 +1,27 @@
|
||||
<?xml version="1.0"?>
|
||||
<clickhouse>
|
||||
<logger>
|
||||
<!-- Structured log formatting:
|
||||
You can specify log format(for now, JSON only). In that case, the console log will be printed
|
||||
in specified format like JSON.
|
||||
For example, as below:
|
||||
{"date_time":"1650918987.180175","thread_name":"#1","thread_id":"254545","level":"Trace","query_id":"","logger_name":"BaseDaemon","message":"Received signal 2","source_file":"../base/daemon/BaseDaemon.cpp; virtual void SignalListener::run()","source_line":"192"}
|
||||
To enable JSON logging support, just uncomment <formatting> tag below.
|
||||
-->
|
||||
<formatting>
|
||||
<type>json</type>
|
||||
<names>
|
||||
<date_time>DATE_TIME</date_time>
|
||||
<thread_name>THREAD_NAME</thread_name>
|
||||
<thread_id>THREAD_ID</thread_id>
|
||||
<level>LEVEL</level>
|
||||
<!--<query_id>QUERY_ID</query_id>
|
||||
<logger_name>LOGGER_NAME</logger_name>-->
|
||||
<message>MESSAGE</message>
|
||||
<source_file>SOURCE_FILE</source_file>
|
||||
<!--<source_line>SOURCE_LINE</source_line>-->
|
||||
</names>
|
||||
</formatting>
|
||||
</logger>
|
||||
|
||||
</clickhouse>
|
@ -1,9 +1,18 @@
|
||||
import pytest
|
||||
from helpers.cluster import ClickHouseCluster
|
||||
import json
|
||||
from xml.etree import ElementTree as ET
|
||||
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
node = cluster.add_instance("node", main_configs=["configs/config_json.xml"])
|
||||
node_all_keys = cluster.add_instance(
|
||||
"node_all_keys", main_configs=["configs/config_all_keys_json.xml"]
|
||||
)
|
||||
node_some_keys = cluster.add_instance(
|
||||
"node_some_keys", main_configs=["configs/config_some_keys_json.xml"]
|
||||
)
|
||||
node_no_keys = cluster.add_instance(
|
||||
"node_no_keys", main_configs=["configs/config_no_keys_json.xml"]
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
@ -23,10 +32,70 @@ def is_json(log_json):
|
||||
return True
|
||||
|
||||
|
||||
def test_structured_logging_json_format(start_cluster):
|
||||
node.query("SELECT 1")
|
||||
def validate_log_config_relation(config, logs, config_type):
|
||||
root = ET.fromstring(config)
|
||||
keys_in_config = set()
|
||||
|
||||
logs = node.grep_in_log("").split("\n")
|
||||
if config_type == "config_no_keys":
|
||||
keys_in_config.add("date_time")
|
||||
keys_in_config.add("thread_name")
|
||||
keys_in_config.add("thread_id")
|
||||
keys_in_config.add("level")
|
||||
keys_in_config.add("query_id")
|
||||
keys_in_config.add("logger_name")
|
||||
keys_in_config.add("message")
|
||||
keys_in_config.add("source_file")
|
||||
keys_in_config.add("source_line")
|
||||
else:
|
||||
for child in root.findall(".//names/*"):
|
||||
keys_in_config.add(child.text)
|
||||
|
||||
try:
|
||||
length = min(10, len(logs))
|
||||
for i in range(0, length):
|
||||
json_log = json.loads(logs[i])
|
||||
keys_in_log = set()
|
||||
for log_key in json_log.keys():
|
||||
keys_in_log.add(log_key)
|
||||
if log_key not in keys_in_config:
|
||||
return False
|
||||
for config_key in keys_in_config:
|
||||
if config_key not in keys_in_log:
|
||||
return False
|
||||
except ValueError as e:
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def validate_logs(logs):
|
||||
length = min(10, len(logs))
|
||||
result = True
|
||||
for i in range(0, length):
|
||||
assert is_json(logs[i])
|
||||
result = result and is_json(logs[i])
|
||||
return result
|
||||
|
||||
|
||||
def valiade_everything(config, node, config_type):
|
||||
node.query("SELECT 1")
|
||||
logs = node.grep_in_log("").split("\n")
|
||||
return validate_logs(logs) and validate_log_config_relation(
|
||||
config, logs, config_type
|
||||
)
|
||||
|
||||
|
||||
def test_structured_logging_json_format(start_cluster):
|
||||
config_all_keys = node_all_keys.exec_in_container(
|
||||
["cat", "/etc/clickhouse-server/config.d/config_all_keys_json.xml"]
|
||||
)
|
||||
config_some_keys = node_some_keys.exec_in_container(
|
||||
["cat", "/etc/clickhouse-server/config.d/config_some_keys_json.xml"]
|
||||
)
|
||||
config_no_keys = node_no_keys.exec_in_container(
|
||||
["cat", "/etc/clickhouse-server/config.d/config_no_keys_json.xml"]
|
||||
)
|
||||
|
||||
assert valiade_everything(config_all_keys, node_all_keys, "config_all_keys") == True
|
||||
assert (
|
||||
valiade_everything(config_some_keys, node_some_keys, "config_some_keys") == True
|
||||
)
|
||||
assert valiade_everything(config_no_keys, node_no_keys, "config_no_keys") == True
|
||||
|
@ -29,3 +29,7 @@ PM
|
||||
no formatting pattern
|
||||
2018-01-01 00:00:00
|
||||
2018-01-01 01:00:00 2018-01-01 04:00:00
|
||||
+0000
|
||||
-1100
|
||||
+0300
|
||||
+0530
|
||||
|
@ -42,4 +42,9 @@ SELECT formatDateTime(toDateTime('2018-01-02 22:33:44'), 'no formatting pattern'
|
||||
SELECT formatDateTime(toDate('2018-01-01'), '%F %T');
|
||||
SELECT
|
||||
formatDateTime(toDateTime('2018-01-01 01:00:00', 'UTC'), '%F %T', 'UTC'),
|
||||
formatDateTime(toDateTime('2018-01-01 01:00:00', 'UTC'), '%F %T', 'Asia/Istanbul')
|
||||
formatDateTime(toDateTime('2018-01-01 01:00:00', 'UTC'), '%F %T', 'Asia/Istanbul');
|
||||
|
||||
SELECT formatDateTime(toDateTime('2020-01-01 01:00:00', 'UTC'), '%z');
|
||||
SELECT formatDateTime(toDateTime('2020-01-01 01:00:00', 'US/Samoa'), '%z');
|
||||
SELECT formatDateTime(toDateTime('2020-01-01 01:00:00', 'Europe/Moscow'), '%z');
|
||||
SELECT formatDateTime(toDateTime('1970-01-01 00:00:00', 'Asia/Kolkata'), '%z');
|
||||
|
@ -12,3 +12,7 @@
|
||||
2 (0,2) Tuple(x Int8, y Int8)
|
||||
{"x":1}
|
||||
{"x":1}
|
||||
{"x":[[1],[1,2]]}
|
||||
{"x":[[],[1,2]]}
|
||||
{"x":[[],[[1],[2]]]}
|
||||
{"x":[[],[[],[2]]]}
|
||||
|
@ -26,5 +26,9 @@ SELECT id, data, toTypeName(data) FROM t_object_convert2 ORDER BY id;
|
||||
DROP TABLE t_object_convert;
|
||||
DROP TABLE t_object_convert2;
|
||||
|
||||
select CAST(CAST('{"x" : 1}', 'Object(\'json\')'), 'Object(Nullable(\'json\'))');
|
||||
select CAST(CAST('{"x" : 1}', 'Object(Nullable(\'json\'))'), 'Object(\'json\')');
|
||||
SELECT CAST(CAST('{"x" : 1}', 'Object(\'json\')'), 'Object(Nullable(\'json\'))');
|
||||
SELECT CAST(CAST('{"x" : 1}', 'Object(Nullable(\'json\'))'), 'Object(\'json\')');
|
||||
SELECT CAST('{"x" : [ 1 , [ 1 , 2] ]}', 'Object(\'json\')');
|
||||
SELECT CAST('{"x" : [ {} , [ 1 , 2] ]}', 'Object(\'json\')');
|
||||
SELECT CAST('{"x" : [ {} , [ 1 , [2]] ]}', 'Object(\'json\')');
|
||||
SELECT CAST('{"x" : [ {} , [ {} , [2]] ]}', 'Object(\'json\')');
|
||||
|
@ -4,3 +4,4 @@
|
||||
3
|
||||
4
|
||||
4
|
||||
1
|
||||
|
@ -1,4 +1,4 @@
|
||||
|
||||
-- #40014
|
||||
CREATE TABLE m0 (id UInt64) ENGINE=MergeTree ORDER BY id SETTINGS index_granularity = 1;
|
||||
INSERT INTO m0 SELECT number FROM numbers(10);
|
||||
CREATE TABLE m1 (id UInt64, s String) ENGINE=MergeTree ORDER BY id SETTINGS index_granularity = 1;
|
||||
@ -7,4 +7,8 @@ CREATE VIEW m1v AS SELECT id FROM m1;
|
||||
|
||||
CREATE TABLE m2 (id UInt64) ENGINE=Merge(currentDatabase(),'m0|m1v');
|
||||
|
||||
SELECT * FROM m2 WHERE id > 1 AND id < 5 ORDER BY id SETTINGS force_primary_key=1, max_bytes_to_read=64;
|
||||
SELECT * FROM m2 WHERE id > 1 AND id < 5 ORDER BY id SETTINGS force_primary_key=1, max_bytes_to_read=64;
|
||||
|
||||
-- #40706
|
||||
CREATE VIEW v AS SELECT 1;
|
||||
SELECT 1 FROM merge(currentDatabase(), '^v$');
|
@ -0,0 +1,8 @@
|
||||
c1 Nullable(String)
|
||||
c2 Nullable(String)
|
||||
c3 Nullable(String)
|
||||
c4 Nullable(String)
|
||||
c1 Nullable(Int64)
|
||||
c2 Nullable(String)
|
||||
c3 Nullable(String)
|
||||
c4 Nullable(String)
|
@ -0,0 +1,2 @@
|
||||
desc format(CSV, ',,,');
|
||||
desc format(CSV, '123,,abv,')
|
@ -0,0 +1,17 @@
|
||||
drop table if exists tbl;
|
||||
|
||||
create table tbl (s String, i int) engine MergeTree order by i;
|
||||
|
||||
insert into tbl values ('123', 123);
|
||||
|
||||
drop row policy if exists filter on tbl;
|
||||
|
||||
create row policy filter on tbl using 0 to all;
|
||||
|
||||
set max_rows_to_read = 0;
|
||||
|
||||
select * from tbl;
|
||||
|
||||
drop row policy filter on tbl;
|
||||
|
||||
drop table tbl;
|
@ -1,3 +1,4 @@
|
||||
v22.8.4.7-lts 2022-08-31
|
||||
v22.8.3.13-lts 2022-08-29
|
||||
v22.8.2.11-lts 2022-08-23
|
||||
v22.8.1.2097-lts 2022-08-18
|
||||
|
|
Loading…
Reference in New Issue
Block a user