Do not allow creating replicated table with inconsistent merge params (#56833)

* save all merge params to zookeeper

Signed-off-by: Duc Canh Le <duccanh.le@ahrefs.com>

* calculate hash for graphite merge params

Signed-off-by: Duc Canh Le <duccanh.le@ahrefs.com>

* add graphite params hash to zookeeper + fix tests

Signed-off-by: Duc Canh Le <duccanh.le@ahrefs.com>

* install new graphite for testing

Signed-off-by: Duc Canh Le <duccanh.le@ahrefs.com>

* fix backward incompatibility

Signed-off-by: Duc Canh Le <duccanh.le@ahrefs.com>

* minor fix test

Signed-off-by: Duc Canh Le <duccanh.le@ahrefs.com>

* Update src/Storages/MergeTree/ReplicatedMergeTreeTableMetadata.cpp

Co-authored-by: Alexander Tokmakov <tavplubix@gmail.com>

* remove peekString and add more comments

- peekString doesn't always work even for ReadBufferFromString
- more comment re. backward compatibility

Signed-off-by: Duc Canh Le <duccanh.le@ahrefs.com>

---------

Signed-off-by: Duc Canh Le <duccanh.le@ahrefs.com>
Co-authored-by: Alexander Tokmakov <tavplubix@gmail.com>
This commit is contained in:
Duc Canh Le 2023-12-15 00:26:35 +08:00 committed by GitHub
parent a0af0392cd
commit 93dd6b83e7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 231 additions and 2 deletions

View File

@ -1,5 +1,6 @@
#pragma once
#include <Common/SipHash.h>
#include <Common/OptimizedRegularExpression.h>
#include <AggregateFunctions/IAggregateFunction.h>
@ -122,6 +123,17 @@ struct Pattern
AggregateFunctionPtr function;
Retentions retentions; /// Must be ordered by 'age' descending.
enum { TypeUndef, TypeRetention, TypeAggregation, TypeAll } type = TypeAll; /// The type of defined pattern, filled automatically
void updateHash(SipHash & hash) const
{
hash.update(rule_type);
hash.update(regexp_str);
hash.update(function->getName());
for (const auto & r : retentions)
{
hash.update(r.age);
hash.update(r.precision);
}
}
};
bool operator==(const Pattern & a, const Pattern & b);
@ -142,6 +154,21 @@ struct Params
Graphite::Patterns patterns;
Graphite::Patterns patterns_plain;
Graphite::Patterns patterns_tagged;
void updateHash(SipHash & hash) const
{
hash.update(path_column_name);
hash.update(time_column_name);
hash.update(value_column_name);
hash.update(value_column_name);
hash.update(version_column_name);
hash.update(patterns_typed);
for (const auto & p : patterns)
p.updateHash(hash);
for (const auto & p : patterns_plain)
p.updateHash(hash);
for (const auto & p : patterns_tagged)
p.updateHash(hash);
}
};
using RollupRule = std::pair<const RetentionPattern *, const AggregationPattern *>;

View File

@ -6,6 +6,9 @@
#include <Parsers/ExpressionListParsers.h>
#include <IO/Operators.h>
#include <Interpreters/FunctionNameNormalizer.h>
#include <Common/SipHash.h>
#include <IO/WriteBufferFromString.h>
#include <IO/WriteHelpers.h>
namespace DB
@ -49,6 +52,17 @@ ReplicatedMergeTreeTableMetadata::ReplicatedMergeTreeTableMetadata(const MergeTr
index_granularity = data_settings->index_granularity;
merging_params_mode = static_cast<int>(data.merging_params.mode);
sign_column = data.merging_params.sign_column;
is_deleted_column = data.merging_params.is_deleted_column;
columns_to_sum = fmt::format("{}", fmt::join(data.merging_params.columns_to_sum.begin(), data.merging_params.columns_to_sum.end(), ","));
version_column = data.merging_params.version_column;
if (data.merging_params.mode == MergeTreeData::MergingParams::Graphite)
{
SipHash graphite_hash;
data.merging_params.graphite_params.updateHash(graphite_hash);
WriteBufferFromOwnString wb;
writeText(graphite_hash.get128(), wb);
graphite_params_hash = std::move(wb.str());
}
/// This code may looks strange, but previously we had only one entity: PRIMARY KEY (or ORDER BY, it doesn't matter)
/// Now we have two different entities ORDER BY and it's optional prefix -- PRIMARY KEY.
@ -90,6 +104,22 @@ ReplicatedMergeTreeTableMetadata::ReplicatedMergeTreeTableMetadata(const MergeTr
void ReplicatedMergeTreeTableMetadata::write(WriteBuffer & out) const
{
/// Important notes: new added field must always be append to the end of serialized metadata
/// for backward compatible.
/// In addition, two consecutive fields should not share any prefix, otherwise deserialize may fails.
/// For example, if you have two field `v1` and `v2` serialized as:
/// if (!v1.empty()) out << "v1: " << v1 << "\n";
/// if (!v2.empty()) out << "v2: " << v2 << "\n";
/// Let say if `v1` is empty and v2 is non-empty, then `v1` is not in serialized metadata.
/// Later, to deserialize the metadata, `read` will sequentially check if each field with `checkString`.
/// When it begin to check for `v1` and `v2`, the metadata buffer look like this:
/// v2: <v2 value>
/// ^
/// cursor
/// `checkString("v1: ", in)` will be called first and it moves the cursor to `2` instead of `v`, so the
/// subsequent call `checkString("v2: ", in)` will also fails.
out << "metadata format version: 1\n"
<< "date column: " << date_column << "\n"
<< "sampling expression: " << sampling_expression << "\n"
@ -121,6 +151,19 @@ void ReplicatedMergeTreeTableMetadata::write(WriteBuffer & out) const
if (!constraints.empty())
out << "constraints: " << constraints << "\n";
if (merge_params_version >= REPLICATED_MERGE_TREE_METADATA_WITH_ALL_MERGE_PARAMETERS)
{
out << "merge parameters format version: " << merge_params_version << "\n";
if (!version_column.empty())
out << "version column: " << version_column << "\n";
if (!is_deleted_column.empty())
out << "is_deleted column: " << is_deleted_column << "\n";
if (!columns_to_sum.empty())
out << "columns to sum: " << columns_to_sum << "\n";
if (!graphite_params_hash.empty())
out << "graphite hash: " << graphite_params_hash << "\n";
}
}
String ReplicatedMergeTreeTableMetadata::toString() const
@ -170,6 +213,26 @@ void ReplicatedMergeTreeTableMetadata::read(ReadBuffer & in)
if (checkString("constraints: ", in))
in >> constraints >> "\n";
if (checkString("merge parameters format version: ", in))
in >> merge_params_version >> "\n";
else
merge_params_version = REPLICATED_MERGE_TREE_METADATA_LEGACY_VERSION;
if (merge_params_version >= REPLICATED_MERGE_TREE_METADATA_WITH_ALL_MERGE_PARAMETERS)
{
if (checkString("version column: ", in))
in >> version_column >> "\n";
if (checkString("is_deleted column: ", in))
in >> is_deleted_column >> "\n";
if (checkString("columns to sum: ", in))
in >> columns_to_sum >> "\n";
if (checkString("graphite hash: ", in))
in >> graphite_params_hash >> "\n";
}
}
ReplicatedMergeTreeTableMetadata ReplicatedMergeTreeTableMetadata::parse(const String & s)
@ -210,6 +273,25 @@ void ReplicatedMergeTreeTableMetadata::checkImmutableFieldsEquals(const Replicat
throw Exception(ErrorCodes::METADATA_MISMATCH, "Existing table metadata in ZooKeeper differs in sign column. "
"Stored in ZooKeeper: {}, local: {}", from_zk.sign_column, sign_column);
if (merge_params_version >= REPLICATED_MERGE_TREE_METADATA_WITH_ALL_MERGE_PARAMETERS && from_zk.merge_params_version >= REPLICATED_MERGE_TREE_METADATA_WITH_ALL_MERGE_PARAMETERS)
{
if (version_column != from_zk.version_column)
throw Exception(ErrorCodes::METADATA_MISMATCH, "Existing table metadata in ZooKeeper differs in version column. "
"Stored in ZooKeeper: {}, local: {}", from_zk.version_column, version_column);
if (is_deleted_column != from_zk.is_deleted_column)
throw Exception(ErrorCodes::METADATA_MISMATCH, "Existing table metadata in ZooKeeper differs in is_deleted column. "
"Stored in ZooKeeper: {}, local: {}", from_zk.is_deleted_column, is_deleted_column);
if (columns_to_sum != from_zk.columns_to_sum)
throw Exception(ErrorCodes::METADATA_MISMATCH, "Existing table metadata in ZooKeeper differs in sum columns. "
"Stored in ZooKeeper: {}, local: {}", from_zk.columns_to_sum, columns_to_sum);
if (graphite_params_hash != from_zk.graphite_params_hash)
throw Exception(ErrorCodes::METADATA_MISMATCH, "Existing table metadata in ZooKeeper differs in graphite params. "
"Stored in ZooKeeper hash: {}, local hash: {}", from_zk.graphite_params_hash, graphite_params_hash);
}
/// NOTE: You can make a less strict check of match expressions so that tables do not break from small changes
/// in formatAST code.
String parsed_zk_primary_key = formattedAST(KeyDescription::parse(from_zk.primary_key, columns, context).expression_list_ast);

View File

@ -4,6 +4,7 @@
#include <Storages/MergeTree/MergeTreeDataFormatVersion.h>
#include <base/types.h>
#include <Storages/StorageInMemoryMetadata.h>
#include <IO/ReadBufferFromString.h>
namespace DB
{
@ -17,11 +18,20 @@ class ReadBuffer;
*/
struct ReplicatedMergeTreeTableMetadata
{
static constexpr int REPLICATED_MERGE_TREE_METADATA_LEGACY_VERSION = 1;
static constexpr int REPLICATED_MERGE_TREE_METADATA_WITH_ALL_MERGE_PARAMETERS = 2;
String date_column;
String sampling_expression;
UInt64 index_granularity;
/// Merging related params
int merging_params_mode;
int merge_params_version = REPLICATED_MERGE_TREE_METADATA_WITH_ALL_MERGE_PARAMETERS;
String sign_column;
String version_column;
String is_deleted_column;
String columns_to_sum;
String graphite_params_hash;
String primary_key;
MergeTreeDataFormatVersion data_format_version;
String partition_key;

View File

@ -0,0 +1,29 @@
<!-- alternative graphite config, for testing 02910_replicated_merge_parameters_must_consistent -->
<clickhouse>
<graphite_rollup_alternative>
<version_column_name>Version</version_column_name>
<pattern>
<regexp>sum</regexp>
<function>any</function>
<retention>
<age>0</age>
<precision>600</precision>
</retention>
<retention>
<age>17280</age>
<precision>6000</precision>
</retention>
</pattern>
<default>
<function>any</function>
<retention>
<age>0</age>
<precision>600</precision>
</retention>
<retention>
<age>17280</age>
<precision>6000</precision>
</retention>
</default>
</graphite_rollup_alternative>
</clickhouse>

View File

@ -26,6 +26,7 @@ ln -sf $SRC_PATH/config.d/macros.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/secure_ports.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/clusters.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/graphite.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/graphite_alternative.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/database_atomic.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/max_concurrent_queries.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/merge_tree_settings.xml $DEST_SERVER_PATH/config.d/

View File

@ -1,3 +1,3 @@
metadata format version: 1\ndate column: \nsampling expression: \nindex granularity: 8192\nmode: 7\nsign column: sign\nprimary key: key1, key2\ndata format version: 1\npartition key: d\ngranularity bytes: 10485760\n
metadata format version: 1\ndate column: \nsampling expression: \nindex granularity: 8192\nmode: 7\nsign column: sign\nprimary key: key1, key2\ndata format version: 1\npartition key: d\ngranularity bytes: 10485760\nmerge parameters format version: 2\nversion column: version\n
1
1

View File

@ -1,2 +1,2 @@
CREATE TABLE default.x\n(\n `i` Int32,\n INDEX mm log2(i) TYPE minmax GRANULARITY 1,\n INDEX nn log2(i) TYPE minmax GRANULARITY 1,\n PROJECTION p\n (\n SELECT max(i)\n ),\n PROJECTION p2\n (\n SELECT min(i)\n )\n)\nENGINE = ReplicatedMergeTree(\'/clickhouse/tables/default/x\', \'r\')\nORDER BY i\nSETTINGS index_granularity = 8192
metadata format version: 1\ndate column: \nsampling expression: \nindex granularity: 8192\nmode: 0\nsign column: \nprimary key: i\ndata format version: 1\npartition key: \nindices: mm log2(i) TYPE minmax GRANULARITY 1, nn log2(i) TYPE minmax GRANULARITY 1\nprojections: p (SELECT max(i)), p2 (SELECT min(i))\ngranularity bytes: 10485760\n
metadata format version: 1\ndate column: \nsampling expression: \nindex granularity: 8192\nmode: 0\nsign column: \nprimary key: i\ndata format version: 1\npartition key: \nindices: mm log2(i) TYPE minmax GRANULARITY 1, nn log2(i) TYPE minmax GRANULARITY 1\nprojections: p (SELECT max(i)), p2 (SELECT min(i))\ngranularity bytes: 10485760\nmerge parameters format version: 2\n

View File

@ -0,0 +1,80 @@
-- Tags: zookeeper, no-replicated-database
CREATE TABLE t
(
`id` UInt64,
`val` String,
`legacy_ver` UInt64,
)
ENGINE = ReplicatedReplacingMergeTree('/tables/{database}/t/', 'r1', legacy_ver)
ORDER BY id;
CREATE TABLE t_r
(
`id` UInt64,
`val` String,
`legacy_ver` UInt64
)
ENGINE = ReplicatedReplacingMergeTree('/tables/{database}/t/', 'r2')
ORDER BY id; -- { serverError METADATA_MISMATCH }
CREATE TABLE t2
(
`id` UInt64,
`val` String,
`legacy_ver` UInt64,
`deleted` UInt8
)
ENGINE = ReplicatedReplacingMergeTree('/tables/{database}/t2/', 'r1', legacy_ver)
ORDER BY id;
CREATE TABLE t2_r
(
`id` UInt64,
`val` String,
`legacy_ver` UInt64,
`deleted` UInt8
)
ENGINE = ReplicatedReplacingMergeTree('/tables/{database}/t2/', 'r2', legacy_ver, deleted)
ORDER BY id; -- { serverError METADATA_MISMATCH }
CREATE TABLE t3
(
`key` UInt64,
`metrics1` UInt64,
`metrics2` UInt64
)
ENGINE = ReplicatedSummingMergeTree('/tables/{database}/t3/', 'r1', metrics1)
ORDER BY key;
CREATE TABLE t3_r
(
`key` UInt64,
`metrics1` UInt64,
`metrics2` UInt64
)
ENGINE = ReplicatedSummingMergeTree('/tables/{database}/t3/', 'r2', metrics2)
ORDER BY key; -- { serverError METADATA_MISMATCH }
CREATE TABLE t4
(
`key` UInt32,
`Path` String,
`Time` DateTime('UTC'),
`Value` Float64,
`Version` UInt32,
`col` UInt64
)
ENGINE = ReplicatedGraphiteMergeTree('/tables/{database}/t4/', 'r1', 'graphite_rollup')
ORDER BY key;
CREATE TABLE t4_r
(
`key` UInt32,
`Path` String,
`Time` DateTime('UTC'),
`Value` Float64,
`Version` UInt32,
`col` UInt64
)
ENGINE = ReplicatedGraphiteMergeTree('/tables/{database}/t4/', 'r2', 'graphite_rollup_alternative')
ORDER BY key; -- { serverError METADATA_MISMATCH }