mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-23 16:12:01 +00:00
better way to deduplicate keys while creating sst files
Signed-off-by: Duc Canh Le <duccanh.le@ahrefs.com>
This commit is contained in:
parent
084f917bf8
commit
98d606298b
@ -16,6 +16,7 @@
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <rocksdb/options.h>
|
||||
#include <rocksdb/slice.h>
|
||||
#include <rocksdb/status.h>
|
||||
#include <rocksdb/utilities/db_ttl.h>
|
||||
#include <Common/SipHash.h>
|
||||
@ -54,20 +55,22 @@ static rocksdb::Status buildSSTFile(const String & path, const ColumnString & ke
|
||||
return status;
|
||||
|
||||
auto rows = perm.size();
|
||||
for (size_t i = 0; i < rows; ++i)
|
||||
for (size_t idx = 0; idx < rows;)
|
||||
{
|
||||
auto row = perm[i];
|
||||
/// We will write the last row of the same key
|
||||
size_t next_idx = idx + 1;
|
||||
while (next_idx < rows && keys.compareAt(perm[idx], perm[next_idx], keys, 1) == 0)
|
||||
++next_idx;
|
||||
|
||||
auto row = perm[next_idx - 1];
|
||||
status = sst_file_writer.Put(keys.getDataAt(row).toView(), values.getDataAt(row).toView());
|
||||
|
||||
/// There could be duplicated keys in chunk, thus Put may give IsInvalidArgument. This is ok, as we're certain that
|
||||
/// keys are sorted in ascending order.
|
||||
if (!status.ok() && !status.IsInvalidArgument())
|
||||
if (!status.ok())
|
||||
return status;
|
||||
|
||||
idx = next_idx;
|
||||
}
|
||||
|
||||
sst_file_writer.Finish();
|
||||
return rocksdb::Status::OK();
|
||||
return sst_file_writer.Finish();
|
||||
}
|
||||
|
||||
EmbeddedRocksDBBulkSink::EmbeddedRocksDBBulkSink(
|
||||
@ -99,9 +102,9 @@ EmbeddedRocksDBBulkSink::~EmbeddedRocksDBBulkSink()
|
||||
if (fs::exists(insert_directory_queue))
|
||||
fs::remove_all(insert_directory_queue);
|
||||
}
|
||||
catch (...)
|
||||
catch(...)
|
||||
{
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__, fmt::format("Error while removing temporary directory {}:", insert_directory_queue));
|
||||
}
|
||||
}
|
||||
|
||||
@ -204,7 +207,7 @@ void EmbeddedRocksDBBulkSink::consume(Chunk chunk_)
|
||||
throw Exception(ErrorCodes::ROCKSDB_ERROR, "RocksDB write error: {}", status.ToString());
|
||||
|
||||
/// Ingest the SST file
|
||||
rocksdb::IngestExternalFileOptions ingest_options;
|
||||
static rocksdb::IngestExternalFileOptions ingest_options;
|
||||
ingest_options.move_files = true; /// The temporary file is on the same disk, so move (or hardlink) file will be faster than copy
|
||||
if (auto status = storage.rocksdb_ptr->IngestExternalFile({sst_file_path}, ingest_options); !status.ok())
|
||||
throw Exception(ErrorCodes::ROCKSDB_ERROR, "RocksDB write error: {}", status.ToString());
|
||||
|
@ -4,3 +4,6 @@
|
||||
1000
|
||||
2
|
||||
1000000
|
||||
1000
|
||||
0 999001
|
||||
1000000
|
||||
|
40
tests/queries/0_stateless/02956_rocksdb_bulk_sink.sh
Executable file
40
tests/queries/0_stateless/02956_rocksdb_bulk_sink.sh
Executable file
@ -0,0 +1,40 @@
|
||||
#!/usr/bin/env bash
|
||||
# Tags: no-ordinary-database, use-rocksdb
|
||||
|
||||
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
# shellcheck source=../shell_config.sh
|
||||
. "$CURDIR"/../shell_config.sh
|
||||
|
||||
# Normal importing, as we only insert 1000 rows, so it should be in memtable
|
||||
${CLICKHOUSE_CLIENT} --query "CREATE TABLE IF NOT EXISTS rocksdb_worm (key UInt64, value UInt64) ENGINE = EmbeddedRocksDB() PRIMARY KEY key SETTINGS optimize_for_bulk_insert = 0;"
|
||||
${CLICKHOUSE_CLIENT} --query "INSERT INTO rocksdb_worm SELECT number, number+1 FROM numbers(1000);"
|
||||
${CLICKHOUSE_CLIENT} --query "SELECT sum(value) FROM system.rocksdb WHERE database = currentDatabase() AND table = 'rocksdb_worm' AND name = 'no.file.opens';" # should be 0 because all data is still in memtable
|
||||
${CLICKHOUSE_CLIENT} --query "SELECT count() FROM rocksdb_worm;"
|
||||
|
||||
# With bulk insertion, there is no memtable, so a small insert should create a new file
|
||||
${CLICKHOUSE_CLIENT} --query "ALTER TABLE rocksdb_worm MODIFY SETTING optimize_for_bulk_insert = 1;"
|
||||
${CLICKHOUSE_CLIENT} --query "TRUNCATE TABLE rocksdb_worm;"
|
||||
${CLICKHOUSE_CLIENT} --query "INSERT INTO rocksdb_worm SELECT number, number+1 FROM numbers(1000);"
|
||||
${CLICKHOUSE_CLIENT} --query "SELECT sum(value) FROM system.rocksdb WHERE database = currentDatabase() AND table = 'rocksdb_worm' AND name = 'no.file.opens';" # should be 1
|
||||
${CLICKHOUSE_CLIENT} --query "SELECT count() FROM rocksdb_worm;"
|
||||
|
||||
# Testing insert with multiple sinks and fixed block size
|
||||
${CLICKHOUSE_CLIENT} --query "TRUNCATE TABLE rocksdb_worm;"
|
||||
${CLICKHOUSE_CLIENT} --query "ALTER TABLE rocksdb_worm MODIFY SETTING bulk_insert_block_size = 500000;"
|
||||
${CLICKHOUSE_CLIENT} --query "INSERT INTO rocksdb_worm SELECT number, number+1 FROM numbers_mt(1000000) SETTINGS max_insert_threads = 2, max_block_size = 100000;"
|
||||
${CLICKHOUSE_CLIENT} --query "SELECT sum(value) FROM system.rocksdb WHERE database = currentDatabase() AND table = 'rocksdb_worm' AND name = 'no.file.opens';" # should be 2 as max_block_size is set to 500000
|
||||
${CLICKHOUSE_CLIENT} --query "SELECT count() FROM rocksdb_worm;"
|
||||
|
||||
# Testing insert with duplicated keys
|
||||
${CLICKHOUSE_CLIENT} --query "TRUNCATE TABLE rocksdb_worm;"
|
||||
${CLICKHOUSE_CLIENT} --query "INSERT INTO rocksdb_worm SELECT number % 1000, number+1 FROM numbers_mt(1000000) SETTINGS max_block_size = 100000, max_insert_threads = 1;"
|
||||
${CLICKHOUSE_CLIENT} --query "SELECT count() FROM rocksdb_worm;"
|
||||
${CLICKHOUSE_CLIENT} --query "SELECT * FROM rocksdb_worm WHERE key = 0;" # should be the latest value - 999001
|
||||
|
||||
# Testing insert with multiple threads
|
||||
${CLICKHOUSE_CLIENT} --query "TRUNCATE TABLE rocksdb_worm;"
|
||||
${CLICKHOUSE_CLIENT} --query "INSERT INTO rocksdb_worm SELECT number, number+1 FROM numbers_mt(1000000)" &
|
||||
${CLICKHOUSE_CLIENT} --query "INSERT INTO rocksdb_worm SELECT number, number+1 FROM numbers_mt(1000000)" &
|
||||
wait
|
||||
${CLICKHOUSE_CLIENT} --query "SELECT count() FROM rocksdb_worm;"
|
||||
|
@ -1,22 +0,0 @@
|
||||
-- Tags: no-ordinary-database, use-rocksdb
|
||||
|
||||
-- Normal importing, as we only insert 1000 rows, so it should be in memtable
|
||||
CREATE TABLE IF NOT EXISTS rocksdb_worm (key UInt64, value UInt64) ENGINE = EmbeddedRocksDB() PRIMARY KEY key SETTINGS optimize_for_bulk_insert = 0;
|
||||
INSERT INTO rocksdb_worm SELECT number, number+1 FROM numbers(1000);
|
||||
SELECT sum(value) FROM system.rocksdb WHERE database = currentDatabase() AND table = 'rocksdb_worm' AND name = 'no.file.opens'; -- should be 0 because all data is still in memtable
|
||||
SELECT count() FROM rocksdb_worm;
|
||||
|
||||
-- With bulk insertion, there is no memtable, so a small insert should create a new file
|
||||
ALTER TABLE rocksdb_worm MODIFY SETTING optimize_for_bulk_insert = 1;
|
||||
TRUNCATE TABLE rocksdb_worm;
|
||||
INSERT INTO rocksdb_worm SELECT number, number+1 FROM numbers(1000);
|
||||
SELECT sum(value) FROM system.rocksdb WHERE database = currentDatabase() AND table = 'rocksdb_worm' AND name = 'no.file.opens'; -- should be 1
|
||||
SELECT count() FROM rocksdb_worm;
|
||||
|
||||
-- Testing insert with multiple sinks and fixed block size
|
||||
TRUNCATE TABLE rocksdb_worm;
|
||||
ALTER TABLE rocksdb_worm MODIFY SETTING bulk_insert_block_size = 500000;
|
||||
INSERT INTO rocksdb_worm SELECT number, number+1 FROM numbers_mt(1000000) SETTINGS max_insert_threads = 2, max_block_size = 100000;
|
||||
SELECT sum(value) FROM system.rocksdb WHERE database = currentDatabase() AND table = 'rocksdb_worm' AND name = 'no.file.opens'; -- should be 2 as max_block_size is set to 500000
|
||||
SELECT count() FROM rocksdb_worm;
|
||||
|
Loading…
Reference in New Issue
Block a user