Merge pull request #48293 from ClickHouse/keeper-map-strict-mode

Add strict mode for KeeperMap
This commit is contained in:
Antonio Andelic 2023-04-04 09:57:01 +02:00 committed by GitHub
commit d0ba1e76bf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 234 additions and 22 deletions

View File

@ -78,7 +78,8 @@ Of course, it's possible to manually run `CREATE TABLE` with same path on nonrel
### Inserts
When new rows are inserted into `KeeperMap`, if the key already exists, the value will be updated, otherwise new key is created.
When new rows are inserted into `KeeperMap`, if the key does not exist, a new entry for the key is created.
If the key exists, and setting `keeper_map_strict_mode` is set to `true`, an exception is thrown, otherwise, the value for the key is overwritten.
Example:
@ -89,6 +90,7 @@ INSERT INTO keeper_map_table VALUES ('some key', 1, 'value', 3.2);
### Deletes
Rows can be deleted using `DELETE` query or `TRUNCATE`.
If the key exists, and setting `keeper_map_strict_mode` is set to `true`, fetching and deleting data will succeed only if it can be executed atomically.
```sql
DELETE FROM keeper_map_table WHERE key LIKE 'some%' AND v1 > 1;
@ -105,6 +107,7 @@ TRUNCATE TABLE keeper_map_table;
### Updates
Values can be updated using `ALTER TABLE` query. Primary key cannot be updated.
If setting `keeper_map_strict_mode` is set to `true`, fetching and updating data will succeed only if it's executed atomically.
```sql
ALTER TABLE keeper_map_table UPDATE v1 = v1 * 10 + 2 WHERE key LIKE 'some%' AND v3 > 3.1;

View File

@ -724,6 +724,7 @@ class IColumn;
M(Bool, force_aggregation_in_order, false, "Force use of aggregation in order on remote nodes during distributed aggregation. PLEASE, NEVER CHANGE THIS SETTING VALUE MANUALLY!", IMPORTANT) \
M(UInt64, http_max_request_param_data_size, 10_MiB, "Limit on size of request data used as a query parameter in predefined HTTP requests.", 0) \
M(Bool, allow_experimental_undrop_table_query, false, "Allow to use undrop query to restore dropped table in a limited time", 0) \
M(Bool, keeper_map_strict_mode, false, "Enforce additional checks during operations on KeeperMap. E.g. throw an exception on an insert for already existing key", 0) \
M(Bool, function_json_value_return_type_allow_nullable, false, "Allow function to return nullable type.", 0) \
M(Bool, function_json_value_return_type_allow_complex, false, "Allow function to return complex type, such as: struct, array, map.", 0) \
// End of COMMON_SETTINGS

View File

@ -550,6 +550,12 @@ void MutationsInterpreter::prepare(bool dry_run)
if (source.hasLightweightDeleteMask())
all_columns.push_back({LightweightDeleteDescription::FILTER_COLUMN});
if (return_all_columns)
{
for (const auto & column : source.getStorage()->getVirtuals())
all_columns.push_back(column);
}
NameSet updated_columns;
bool materialize_ttl_recalculate_only = source.materializeTTLRecalculateOnly();
@ -906,6 +912,8 @@ void MutationsInterpreter::prepareMutationStages(std::vector<Stage> & prepared_s
{
auto storage_snapshot = source.getStorageSnapshot(metadata_snapshot, context);
auto options = GetColumnsOptions(GetColumnsOptions::AllPhysical).withExtendedObjects();
if (return_all_columns)
options.withVirtuals();
auto all_columns = storage_snapshot->getColumns(options);
/// Add _row_exists column if it is present in the part
@ -1256,6 +1264,7 @@ void MutationsInterpreter::validate()
}
QueryPlan plan;
initQueryPlan(stages.front(), plan);
auto pipeline = addStreamsForLaterStages(stages, plan);
}

View File

@ -59,6 +59,8 @@ namespace ErrorCodes
namespace
{
constexpr std::string_view version_column_name = "_version";
std::string formattedAST(const ASTPtr & ast)
{
if (!ast)
@ -77,7 +79,6 @@ void verifyTableId(const StorageID & table_id)
table_id.getDatabaseName(),
database->getEngineName());
}
}
}
@ -86,11 +87,13 @@ class StorageKeeperMapSink : public SinkToStorage
{
StorageKeeperMap & storage;
std::unordered_map<std::string, std::string> new_values;
std::unordered_map<std::string, int32_t> versions;
size_t primary_key_pos;
ContextPtr context;
public:
StorageKeeperMapSink(StorageKeeperMap & storage_, const StorageMetadataPtr & metadata_snapshot)
: SinkToStorage(metadata_snapshot->getSampleBlock()), storage(storage_)
StorageKeeperMapSink(StorageKeeperMap & storage_, Block header, ContextPtr context_)
: SinkToStorage(header), storage(storage_), context(std::move(context_))
{
auto primary_key = storage.getPrimaryKey();
assert(primary_key.size() == 1);
@ -113,18 +116,36 @@ public:
wb_value.restart();
size_t idx = 0;
int32_t version = -1;
for (const auto & elem : block)
{
if (elem.name == version_column_name)
{
version = assert_cast<const ColumnVector<Int32> &>(*elem.column).getData()[i];
continue;
}
elem.type->getDefaultSerialization()->serializeBinary(*elem.column, i, idx == primary_key_pos ? wb_key : wb_value, {});
++idx;
}
auto key = base64Encode(wb_key.str(), /* url_encoding */ true);
if (version != -1)
versions[key] = version;
new_values[std::move(key)] = std::move(wb_value.str());
}
}
void onFinish() override
{
finalize<false>(/*strict*/ context->getSettingsRef().keeper_map_strict_mode);
}
template <bool for_update>
void finalize(bool strict)
{
auto zookeeper = storage.getClient();
@ -147,21 +168,39 @@ public:
for (const auto & [key, _] : new_values)
key_paths.push_back(storage.fullPathForKey(key));
auto results = zookeeper->exists(key_paths);
zkutil::ZooKeeper::MultiExistsResponse results;
if constexpr (!for_update)
{
if (!strict)
results = zookeeper->exists(key_paths);
}
Coordination::Requests requests;
requests.reserve(key_paths.size());
for (size_t i = 0; i < key_paths.size(); ++i)
{
auto key = fs::path(key_paths[i]).filename();
if (results[i].error == Coordination::Error::ZOK)
if constexpr (for_update)
{
requests.push_back(zkutil::makeSetRequest(key_paths[i], new_values[key], -1));
int32_t version = -1;
if (strict)
version = versions.at(key);
requests.push_back(zkutil::makeSetRequest(key_paths[i], new_values[key], version));
}
else
{
requests.push_back(zkutil::makeCreateRequest(key_paths[i], new_values[key], zkutil::CreateMode::Persistent));
++new_keys_num;
if (!strict && results[i].error == Coordination::Error::ZOK)
{
requests.push_back(zkutil::makeSetRequest(key_paths[i], new_values[key], -1));
}
else
{
requests.push_back(zkutil::makeCreateRequest(key_paths[i], new_values[key], zkutil::CreateMode::Persistent));
++new_keys_num;
}
}
}
@ -193,6 +232,18 @@ class StorageKeeperMapSource : public ISource
KeyContainerIter it;
KeyContainerIter end;
bool with_version_column = false;
static Block getHeader(Block header, bool with_version_column)
{
if (with_version_column)
header.insert(
{DataTypeInt32{}.createColumn(),
std::make_shared<DataTypeInt32>(), std::string{version_column_name}});
return header;
}
public:
StorageKeeperMapSource(
const StorageKeeperMap & storage_,
@ -200,8 +251,10 @@ public:
size_t max_block_size_,
KeyContainerPtr container_,
KeyContainerIter begin_,
KeyContainerIter end_)
: ISource(header), storage(storage_), max_block_size(max_block_size_), container(std::move(container_)), it(begin_), end(end_)
KeyContainerIter end_,
bool with_version_column_)
: ISource(getHeader(header, with_version_column_)), storage(storage_), max_block_size(max_block_size_), container(std::move(container_)), it(begin_), end(end_)
, with_version_column(with_version_column_)
{
}
@ -225,12 +278,12 @@ public:
for (auto & raw_key : raw_keys)
raw_key = base64Encode(raw_key, /* url_encoding */ true);
return storage.getBySerializedKeys(raw_keys, nullptr);
return storage.getBySerializedKeys(raw_keys, nullptr, with_version_column);
}
else
{
size_t elem_num = std::min(max_block_size, static_cast<size_t>(end - it));
auto chunk = storage.getBySerializedKeys(std::span{it, it + elem_num}, nullptr);
auto chunk = storage.getBySerializedKeys(std::span{it, it + elem_num}, nullptr, with_version_column);
it += elem_num;
return chunk;
}
@ -426,6 +479,16 @@ Pipe StorageKeeperMap::read(
auto primary_key_type = sample_block.getByName(primary_key).type;
std::tie(filtered_keys, all_scan) = getFilterKeys(primary_key, primary_key_type, query_info, context_);
bool with_version_column = false;
for (const auto & column : column_names)
{
if (column == version_column_name)
{
with_version_column = true;
break;
}
}
const auto process_keys = [&]<typename KeyContainerPtr>(KeyContainerPtr keys) -> Pipe
{
if (keys->empty())
@ -449,7 +512,7 @@ Pipe StorageKeeperMap::read(
using KeyContainer = typename KeyContainerPtr::element_type;
pipes.emplace_back(std::make_shared<StorageKeeperMapSource<KeyContainer>>(
*this, sample_block, max_block_size, keys, keys->begin() + begin, keys->begin() + end));
*this, sample_block, max_block_size, keys, keys->begin() + begin, keys->begin() + end, with_version_column));
}
return Pipe::unitePipes(std::move(pipes));
};
@ -461,10 +524,10 @@ Pipe StorageKeeperMap::read(
return process_keys(std::move(filtered_keys));
}
SinkToStoragePtr StorageKeeperMap::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr /*context*/)
SinkToStoragePtr StorageKeeperMap::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context)
{
checkTable<true>();
return std::make_shared<StorageKeeperMapSink>(*this, metadata_snapshot);
return std::make_shared<StorageKeeperMapSink>(*this, metadata_snapshot->getSampleBlock(), local_context);
}
void StorageKeeperMap::truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr, TableExclusiveLockHolder &)
@ -554,6 +617,12 @@ void StorageKeeperMap::drop()
dropTable(client, metadata_drop_lock);
}
NamesAndTypesList StorageKeeperMap::getVirtuals() const
{
return NamesAndTypesList{
{std::string{version_column_name}, std::make_shared<DataTypeInt32>()}};
}
zkutil::ZooKeeperPtr StorageKeeperMap::getClient() const
{
std::lock_guard lock{zookeeper_mutex};
@ -670,13 +739,18 @@ Chunk StorageKeeperMap::getByKeys(const ColumnsWithTypeAndName & keys, PaddedPOD
if (raw_keys.size() != keys[0].column->size())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Assertion failed: {} != {}", raw_keys.size(), keys[0].column->size());
return getBySerializedKeys(raw_keys, &null_map);
return getBySerializedKeys(raw_keys, &null_map, /* version_column */ false);
}
Chunk StorageKeeperMap::getBySerializedKeys(const std::span<const std::string> keys, PaddedPODArray<UInt8> * null_map) const
Chunk StorageKeeperMap::getBySerializedKeys(const std::span<const std::string> keys, PaddedPODArray<UInt8> * null_map, bool with_version) const
{
Block sample_block = getInMemoryMetadataPtr()->getSampleBlock();
MutableColumns columns = sample_block.cloneEmptyColumns();
MutableColumnPtr version_column = nullptr;
if (with_version)
version_column = ColumnVector<Int32>::create();
size_t primary_key_pos = getPrimaryKeyPos(sample_block, getPrimaryKey());
if (null_map)
@ -706,6 +780,9 @@ Chunk StorageKeeperMap::getBySerializedKeys(const std::span<const std::string> k
if (code == Coordination::Error::ZOK)
{
fillColumns(base64Decode(keys[i], true), response.data, primary_key_pos, sample_block, columns);
if (version_column)
version_column->insert(response.stat.version);
}
else if (code == Coordination::Error::ZNONODE)
{
@ -714,6 +791,9 @@ Chunk StorageKeeperMap::getBySerializedKeys(const std::span<const std::string> k
(*null_map)[i] = 0;
for (size_t col_idx = 0; col_idx < sample_block.columns(); ++col_idx)
columns[col_idx]->insert(sample_block.getByPosition(col_idx).type->getDefault());
if (version_column)
version_column->insert(-1);
}
}
else
@ -723,6 +803,10 @@ Chunk StorageKeeperMap::getBySerializedKeys(const std::span<const std::string> k
}
size_t num_rows = columns.at(0)->size();
if (version_column)
columns.push_back(std::move(version_column));
return Chunk(std::move(columns), num_rows);
}
@ -763,6 +847,8 @@ void StorageKeeperMap::mutate(const MutationCommands & commands, ContextPtr loca
if (commands.empty())
return;
bool strict = local_context->getSettingsRef().keeper_map_strict_mode;
assert(commands.size() == 1);
auto metadata_snapshot = getInMemoryMetadataPtr();
@ -784,8 +870,10 @@ void StorageKeeperMap::mutate(const MutationCommands & commands, ContextPtr loca
auto header = interpreter->getUpdatedHeader();
auto primary_key_pos = header.getPositionByName(primary_key);
auto version_position = header.getPositionByName(std::string{version_column_name});
auto client = getClient();
Block block;
while (executor.pull(block))
{
@ -793,14 +881,23 @@ void StorageKeeperMap::mutate(const MutationCommands & commands, ContextPtr loca
auto column = column_type_name.column;
auto size = column->size();
WriteBufferFromOwnString wb_key;
Coordination::Requests delete_requests;
for (size_t i = 0; i < size; ++i)
{
int32_t version = -1;
if (strict)
{
const auto & version_column = block.getByPosition(version_position).column;
version = assert_cast<const ColumnVector<Int32> &>(*version_column).getData()[i];
}
wb_key.restart();
column_type_name.type->getDefaultSerialization()->serializeBinary(*column, i, wb_key, {});
delete_requests.emplace_back(zkutil::makeRemoveRequest(fullPathForKey(base64Encode(wb_key.str(), true)), -1));
delete_requests.emplace_back(zkutil::makeRemoveRequest(fullPathForKey(base64Encode(wb_key.str(), true)), version));
}
Coordination::Responses responses;
@ -834,12 +931,13 @@ void StorageKeeperMap::mutate(const MutationCommands & commands, ContextPtr loca
auto pipeline = QueryPipelineBuilder::getPipeline(interpreter->execute());
PullingPipelineExecutor executor(pipeline);
auto sink = std::make_shared<StorageKeeperMapSink>(*this, metadata_snapshot);
auto sink = std::make_shared<StorageKeeperMapSink>(*this, executor.getHeader(), local_context);
Block block;
while (executor.pull(block))
sink->consume(Chunk{block.getColumns(), block.rows()});
sink->onFinish();
sink->finalize<true>(strict);
}
namespace

View File

@ -46,11 +46,13 @@ public:
void truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr, TableExclusiveLockHolder &) override;
void drop() override;
NamesAndTypesList getVirtuals() const override;
std::string getName() const override { return "KeeperMap"; }
Names getPrimaryKey() const override { return {primary_key}; }
Chunk getByKeys(const ColumnsWithTypeAndName & keys, PaddedPODArray<UInt8> & null_map, const Names &) const override;
Chunk getBySerializedKeys(std::span<const std::string> keys, PaddedPODArray<UInt8> * null_map) const;
Chunk getBySerializedKeys(std::span<const std::string> keys, PaddedPODArray<UInt8> * null_map, bool with_version) const;
Block getSampleBlock(const Names &) const override;

View File

@ -0,0 +1,3 @@
1 1.1
1 2.1
1 2.1

View File

@ -0,0 +1,20 @@
-- Tags: no-ordinary-database, no-fasttest
DROP TABLE IF EXISTS 02706_keeper_map_insert_strict SYNC;
CREATE TABLE 02706_keeper_map_insert_strict (key UInt64, value Float64) Engine=KeeperMap('/' || currentDatabase() || '/test_02706_keeper_map_insert_strict') PRIMARY KEY(key);
INSERT INTO 02706_keeper_map_insert_strict VALUES (1, 1.1), (2, 2.2);
SELECT * FROM 02706_keeper_map_insert_strict WHERE key = 1;
SET keeper_map_strict_mode = false;
INSERT INTO 02706_keeper_map_insert_strict VALUES (1, 2.1);
SELECT * FROM 02706_keeper_map_insert_strict WHERE key = 1;
SET keeper_map_strict_mode = true;
INSERT INTO 02706_keeper_map_insert_strict VALUES (1, 2.1); -- { serverError KEEPER_EXCEPTION }
SELECT * FROM 02706_keeper_map_insert_strict WHERE key = 1;
DROP TABLE 02706_keeper_map_insert_strict;

View File

@ -0,0 +1,32 @@
1 Some string 0
2 Some other string 0
3 random 0
4 random2 0
-----------
3 random 0
4 random2 0
-----------
3 random 0
-----------
0
-----------
1 String 10
2 String 20
3 String 30
4 String 40
-----------
1 String 10
2 String 20
3 Another 30
4 Another 40
-----------
1 String 10
2 String 20
3 Another 30
4 Another 40
-----------
1 String 102
2 String 202
3 Another 302
4 Another 402
-----------

View File

@ -0,0 +1,44 @@
-- Tags: no-ordinary-database, no-fasttest
DROP TABLE IF EXISTS 02707_keepermap_delete_update;
SET keeper_map_strict_mode = 1;
CREATE TABLE 02707_keepermap_delete_update (key UInt64, value String, value2 UInt64) ENGINE=KeeperMap('/' || currentDatabase() || '/test02707_keepermap_delete_update') PRIMARY KEY(key);
INSERT INTO 02707_keepermap_delete_update VALUES (1, 'Some string', 0), (2, 'Some other string', 0), (3, 'random', 0), (4, 'random2', 0);
SELECT * FROM 02707_keepermap_delete_update ORDER BY key;
SELECT '-----------';
DELETE FROM 02707_keepermap_delete_update WHERE value LIKE 'Some%string';
SELECT * FROM 02707_keepermap_delete_update ORDER BY key;
SELECT '-----------';
ALTER TABLE 02707_keepermap_delete_update DELETE WHERE key >= 4;
SELECT * FROM 02707_keepermap_delete_update ORDER BY key;
SELECT '-----------';
DELETE FROM 02707_keepermap_delete_update WHERE 1 = 1;
SELECT count() FROM 02707_keepermap_delete_update;
SELECT '-----------';
INSERT INTO 02707_keepermap_delete_update VALUES (1, 'String', 10), (2, 'String', 20), (3, 'String', 30), (4, 'String', 40);
SELECT * FROM 02707_keepermap_delete_update ORDER BY key;
SELECT '-----------';
ALTER TABLE 02707_keepermap_delete_update UPDATE value = 'Another' WHERE key > 2;
SELECT * FROM 02707_keepermap_delete_update ORDER BY key;
SELECT '-----------';
ALTER TABLE 02707_keepermap_delete_update UPDATE key = key * 10 WHERE 1 = 1; -- { serverError 36 }
SELECT * FROM 02707_keepermap_delete_update ORDER BY key;
SELECT '-----------';
ALTER TABLE 02707_keepermap_delete_update UPDATE value2 = value2 * 10 + 2 WHERE value2 < 100;
SELECT * FROM 02707_keepermap_delete_update ORDER BY key;
SELECT '-----------';
DROP TABLE IF EXISTS 02707_keepermap_delete_update;