From 98d606298b4221af8d33bbea1db4171d17595165 Mon Sep 17 00:00:00 2001 From: Duc Canh Le Date: Mon, 22 Apr 2024 10:01:44 +0000 Subject: [PATCH] better way to deduplicate keys while creating sst files Signed-off-by: Duc Canh Le --- .../RocksDB/EmbeddedRocksDBBulkSink.cpp | 25 +++++++----- .../02956_rocksdb_bulk_sink.reference | 3 ++ .../0_stateless/02956_rocksdb_bulk_sink.sh | 40 +++++++++++++++++++ .../0_stateless/02956_rocksdb_bulk_sink.sql | 22 ---------- 4 files changed, 57 insertions(+), 33 deletions(-) create mode 100755 tests/queries/0_stateless/02956_rocksdb_bulk_sink.sh delete mode 100644 tests/queries/0_stateless/02956_rocksdb_bulk_sink.sql diff --git a/src/Storages/RocksDB/EmbeddedRocksDBBulkSink.cpp b/src/Storages/RocksDB/EmbeddedRocksDBBulkSink.cpp index dbaa5a8afea..b58e0c5eb4a 100644 --- a/src/Storages/RocksDB/EmbeddedRocksDBBulkSink.cpp +++ b/src/Storages/RocksDB/EmbeddedRocksDBBulkSink.cpp @@ -16,6 +16,7 @@ #include #include #include +#include #include #include #include @@ -54,20 +55,22 @@ static rocksdb::Status buildSSTFile(const String & path, const ColumnString & ke return status; auto rows = perm.size(); - for (size_t i = 0; i < rows; ++i) + for (size_t idx = 0; idx < rows;) { - auto row = perm[i]; + /// We will write the last row of the same key + size_t next_idx = idx + 1; + while (next_idx < rows && keys.compareAt(perm[idx], perm[next_idx], keys, 1) == 0) + ++next_idx; + auto row = perm[next_idx - 1]; status = sst_file_writer.Put(keys.getDataAt(row).toView(), values.getDataAt(row).toView()); - - /// There could be duplicated keys in chunk, thus Put may give IsInvalidArgument. This is ok, as we're certain that - /// keys are sorted in ascending order. - if (!status.ok() && !status.IsInvalidArgument()) + if (!status.ok()) return status; + + idx = next_idx; } - sst_file_writer.Finish(); - return rocksdb::Status::OK(); + return sst_file_writer.Finish(); } EmbeddedRocksDBBulkSink::EmbeddedRocksDBBulkSink( @@ -99,9 +102,9 @@ EmbeddedRocksDBBulkSink::~EmbeddedRocksDBBulkSink() if (fs::exists(insert_directory_queue)) fs::remove_all(insert_directory_queue); } - catch (...) + catch(...) { - tryLogCurrentException(__PRETTY_FUNCTION__); + tryLogCurrentException(__PRETTY_FUNCTION__, fmt::format("Error while removing temporary directory {}:", insert_directory_queue)); } } @@ -204,7 +207,7 @@ void EmbeddedRocksDBBulkSink::consume(Chunk chunk_) throw Exception(ErrorCodes::ROCKSDB_ERROR, "RocksDB write error: {}", status.ToString()); /// Ingest the SST file - rocksdb::IngestExternalFileOptions ingest_options; + static rocksdb::IngestExternalFileOptions ingest_options; ingest_options.move_files = true; /// The temporary file is on the same disk, so move (or hardlink) file will be faster than copy if (auto status = storage.rocksdb_ptr->IngestExternalFile({sst_file_path}, ingest_options); !status.ok()) throw Exception(ErrorCodes::ROCKSDB_ERROR, "RocksDB write error: {}", status.ToString()); diff --git a/tests/queries/0_stateless/02956_rocksdb_bulk_sink.reference b/tests/queries/0_stateless/02956_rocksdb_bulk_sink.reference index f8cd87238a8..dcf8a322ed5 100644 --- a/tests/queries/0_stateless/02956_rocksdb_bulk_sink.reference +++ b/tests/queries/0_stateless/02956_rocksdb_bulk_sink.reference @@ -4,3 +4,6 @@ 1000 2 1000000 +1000 +0 999001 +1000000 diff --git a/tests/queries/0_stateless/02956_rocksdb_bulk_sink.sh b/tests/queries/0_stateless/02956_rocksdb_bulk_sink.sh new file mode 100755 index 00000000000..9f771b0fcb4 --- /dev/null +++ b/tests/queries/0_stateless/02956_rocksdb_bulk_sink.sh @@ -0,0 +1,40 @@ +#!/usr/bin/env bash +# Tags: no-ordinary-database, use-rocksdb + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CURDIR"/../shell_config.sh + +# Normal importing, as we only insert 1000 rows, so it should be in memtable +${CLICKHOUSE_CLIENT} --query "CREATE TABLE IF NOT EXISTS rocksdb_worm (key UInt64, value UInt64) ENGINE = EmbeddedRocksDB() PRIMARY KEY key SETTINGS optimize_for_bulk_insert = 0;" +${CLICKHOUSE_CLIENT} --query "INSERT INTO rocksdb_worm SELECT number, number+1 FROM numbers(1000);" +${CLICKHOUSE_CLIENT} --query "SELECT sum(value) FROM system.rocksdb WHERE database = currentDatabase() AND table = 'rocksdb_worm' AND name = 'no.file.opens';" # should be 0 because all data is still in memtable +${CLICKHOUSE_CLIENT} --query "SELECT count() FROM rocksdb_worm;" + +# With bulk insertion, there is no memtable, so a small insert should create a new file +${CLICKHOUSE_CLIENT} --query "ALTER TABLE rocksdb_worm MODIFY SETTING optimize_for_bulk_insert = 1;" +${CLICKHOUSE_CLIENT} --query "TRUNCATE TABLE rocksdb_worm;" +${CLICKHOUSE_CLIENT} --query "INSERT INTO rocksdb_worm SELECT number, number+1 FROM numbers(1000);" +${CLICKHOUSE_CLIENT} --query "SELECT sum(value) FROM system.rocksdb WHERE database = currentDatabase() AND table = 'rocksdb_worm' AND name = 'no.file.opens';" # should be 1 +${CLICKHOUSE_CLIENT} --query "SELECT count() FROM rocksdb_worm;" + +# Testing insert with multiple sinks and fixed block size +${CLICKHOUSE_CLIENT} --query "TRUNCATE TABLE rocksdb_worm;" +${CLICKHOUSE_CLIENT} --query "ALTER TABLE rocksdb_worm MODIFY SETTING bulk_insert_block_size = 500000;" +${CLICKHOUSE_CLIENT} --query "INSERT INTO rocksdb_worm SELECT number, number+1 FROM numbers_mt(1000000) SETTINGS max_insert_threads = 2, max_block_size = 100000;" +${CLICKHOUSE_CLIENT} --query "SELECT sum(value) FROM system.rocksdb WHERE database = currentDatabase() AND table = 'rocksdb_worm' AND name = 'no.file.opens';" # should be 2 as max_block_size is set to 500000 +${CLICKHOUSE_CLIENT} --query "SELECT count() FROM rocksdb_worm;" + +# Testing insert with duplicated keys +${CLICKHOUSE_CLIENT} --query "TRUNCATE TABLE rocksdb_worm;" +${CLICKHOUSE_CLIENT} --query "INSERT INTO rocksdb_worm SELECT number % 1000, number+1 FROM numbers_mt(1000000) SETTINGS max_block_size = 100000, max_insert_threads = 1;" +${CLICKHOUSE_CLIENT} --query "SELECT count() FROM rocksdb_worm;" +${CLICKHOUSE_CLIENT} --query "SELECT * FROM rocksdb_worm WHERE key = 0;" # should be the latest value - 999001 + +# Testing insert with multiple threads +${CLICKHOUSE_CLIENT} --query "TRUNCATE TABLE rocksdb_worm;" +${CLICKHOUSE_CLIENT} --query "INSERT INTO rocksdb_worm SELECT number, number+1 FROM numbers_mt(1000000)" & +${CLICKHOUSE_CLIENT} --query "INSERT INTO rocksdb_worm SELECT number, number+1 FROM numbers_mt(1000000)" & +wait +${CLICKHOUSE_CLIENT} --query "SELECT count() FROM rocksdb_worm;" + diff --git a/tests/queries/0_stateless/02956_rocksdb_bulk_sink.sql b/tests/queries/0_stateless/02956_rocksdb_bulk_sink.sql deleted file mode 100644 index bfe1c3eaceb..00000000000 --- a/tests/queries/0_stateless/02956_rocksdb_bulk_sink.sql +++ /dev/null @@ -1,22 +0,0 @@ --- Tags: no-ordinary-database, use-rocksdb - --- Normal importing, as we only insert 1000 rows, so it should be in memtable -CREATE TABLE IF NOT EXISTS rocksdb_worm (key UInt64, value UInt64) ENGINE = EmbeddedRocksDB() PRIMARY KEY key SETTINGS optimize_for_bulk_insert = 0; -INSERT INTO rocksdb_worm SELECT number, number+1 FROM numbers(1000); -SELECT sum(value) FROM system.rocksdb WHERE database = currentDatabase() AND table = 'rocksdb_worm' AND name = 'no.file.opens'; -- should be 0 because all data is still in memtable -SELECT count() FROM rocksdb_worm; - --- With bulk insertion, there is no memtable, so a small insert should create a new file -ALTER TABLE rocksdb_worm MODIFY SETTING optimize_for_bulk_insert = 1; -TRUNCATE TABLE rocksdb_worm; -INSERT INTO rocksdb_worm SELECT number, number+1 FROM numbers(1000); -SELECT sum(value) FROM system.rocksdb WHERE database = currentDatabase() AND table = 'rocksdb_worm' AND name = 'no.file.opens'; -- should be 1 -SELECT count() FROM rocksdb_worm; - --- Testing insert with multiple sinks and fixed block size -TRUNCATE TABLE rocksdb_worm; -ALTER TABLE rocksdb_worm MODIFY SETTING bulk_insert_block_size = 500000; -INSERT INTO rocksdb_worm SELECT number, number+1 FROM numbers_mt(1000000) SETTINGS max_insert_threads = 2, max_block_size = 100000; -SELECT sum(value) FROM system.rocksdb WHERE database = currentDatabase() AND table = 'rocksdb_worm' AND name = 'no.file.opens'; -- should be 2 as max_block_size is set to 500000 -SELECT count() FROM rocksdb_worm; -