mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-10 01:25:21 +00:00
Allow cache over azure blob storage, add tests
This commit is contained in:
parent
a6655bea1c
commit
274457e5cf
@ -17,6 +17,8 @@ RUN apt-get update -y \
|
||||
mysql-client=8.0* \
|
||||
ncdu \
|
||||
netcat-openbsd \
|
||||
nodejs \
|
||||
npm \
|
||||
openjdk-11-jre-headless \
|
||||
openssl \
|
||||
postgresql-client \
|
||||
@ -74,6 +76,8 @@ ENV MINIO_ROOT_USER="clickhouse"
|
||||
ENV MINIO_ROOT_PASSWORD="clickhouse"
|
||||
ENV EXPORT_S3_STORAGE_POLICIES=1
|
||||
|
||||
RUN npm install -g azurite
|
||||
|
||||
COPY run.sh /
|
||||
COPY setup_minio.sh /
|
||||
COPY setup_hdfs_minicluster.sh /
|
||||
|
@ -20,6 +20,7 @@ ln -s /usr/share/clickhouse-test/clickhouse-test /usr/bin/clickhouse-test
|
||||
|
||||
./setup_minio.sh stateless
|
||||
./setup_hdfs_minicluster.sh
|
||||
azurite-blob --blobHost 0.0.0.0 --blobPort 10000 --debug /azurite_log &
|
||||
|
||||
# For flaky check we also enable thread fuzzer
|
||||
if [ "$NUM_TRIES" -gt "1" ]; then
|
||||
|
@ -133,24 +133,33 @@ void CachedOnDiskWriteBufferFromFile::appendFilesystemCacheLog(const FileSegment
|
||||
}
|
||||
}
|
||||
|
||||
void CachedOnDiskWriteBufferFromFile::finalizeImpl()
|
||||
void CachedOnDiskWriteBufferFromFile::preFinalize()
|
||||
{
|
||||
try
|
||||
{
|
||||
next();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||
|
||||
if (cache_writer)
|
||||
cache_writer->finalize();
|
||||
|
||||
throw;
|
||||
}
|
||||
|
||||
if (cache_writer)
|
||||
{
|
||||
cache_writer->finalize();
|
||||
cache_writer.reset();
|
||||
}
|
||||
}
|
||||
|
||||
/// void CachedOnDiskWriteBufferFromFile::finalizeImpl()
|
||||
/// {
|
||||
/// // try
|
||||
/// // {
|
||||
/// // next();
|
||||
/// // }
|
||||
/// // catch (...)
|
||||
/// // {
|
||||
/// // tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||
///
|
||||
/// // if (cache_writer)
|
||||
/// // cache_writer->finalize();
|
||||
///
|
||||
/// // throw;
|
||||
/// // }
|
||||
///
|
||||
/// if (cache_writer)
|
||||
/// cache_writer->finalize();
|
||||
/// }
|
||||
|
||||
}
|
||||
|
@ -29,7 +29,8 @@ public:
|
||||
|
||||
void nextImpl() override;
|
||||
|
||||
void finalizeImpl() override;
|
||||
void preFinalize() override;
|
||||
// void finalizeImpl() override;
|
||||
|
||||
private:
|
||||
void cacheData(char * data, size_t size);
|
||||
|
@ -47,6 +47,19 @@ ReadBufferFromAzureBlobStorage::ReadBufferFromAzureBlobStorage(
|
||||
}
|
||||
}
|
||||
|
||||
SeekableReadBuffer::Range ReadBufferFromAzureBlobStorage::getRemainingReadRange() const
|
||||
{
|
||||
return Range{
|
||||
.left = static_cast<size_t>(offset),
|
||||
.right = read_until_position ? std::optional{read_until_position - 1} : std::nullopt
|
||||
};
|
||||
}
|
||||
|
||||
void ReadBufferFromAzureBlobStorage::setReadUntilPosition(size_t position)
|
||||
{
|
||||
read_until_position = position;
|
||||
initialized = false;
|
||||
}
|
||||
|
||||
bool ReadBufferFromAzureBlobStorage::nextImpl()
|
||||
{
|
||||
|
@ -36,6 +36,12 @@ public:
|
||||
|
||||
String getFileName() const override { return path; }
|
||||
|
||||
void setReadUntilPosition(size_t position) override;
|
||||
|
||||
Range getRemainingReadRange() const override;
|
||||
|
||||
bool supportsRightBoundedReads() const override { return true; }
|
||||
|
||||
private:
|
||||
|
||||
void initialize();
|
||||
|
@ -74,6 +74,24 @@
|
||||
<path>s3_cache_5/</path>
|
||||
<max_size>22548578304</max_size>
|
||||
</s3_cache_5>
|
||||
<azure>
|
||||
<type>azure_blob_storage</type>
|
||||
<storage_account_url>http://localhost:10000/devstoreaccount1</storage_account_url>
|
||||
<container_name>cont</container_name>
|
||||
<container_already_exists>true</container_already_exists>
|
||||
<skip_access_check>false</skip_access_check>
|
||||
<!-- default credentials for Azurite storage account -->
|
||||
<account_name>devstoreaccount1</account_name>
|
||||
<account_key>Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==</account_key>
|
||||
<max_single_part_upload_size>33554432</max_single_part_upload_size>
|
||||
</azure>
|
||||
<cached_azure>
|
||||
<type>cache</type>
|
||||
<disk>azure</disk>
|
||||
<path>/home/ubuntu/azure_cache/</path>
|
||||
<max_size>100000000000</max_size>
|
||||
<cache_on_write_operations>1</cache_on_write_operations>
|
||||
</cached_azure>
|
||||
<!-- local disks -->
|
||||
<local_disk>
|
||||
<type>local</type>
|
||||
@ -180,6 +198,13 @@
|
||||
</main>
|
||||
</volumes>
|
||||
</local_cache_3>
|
||||
<azure_cache>
|
||||
<volumes>
|
||||
<main>
|
||||
<disk>cached_azure</disk>
|
||||
</main>
|
||||
</volumes>
|
||||
</azure_cache>
|
||||
</policies>
|
||||
</storage_configuration>
|
||||
</clickhouse>
|
||||
|
@ -5,7 +5,7 @@
|
||||
<type>azure_blob_storage</type>
|
||||
<storage_account_url>http://azurite1:10000/devstoreaccount1</storage_account_url>
|
||||
<container_name>cont</container_name>
|
||||
<container_already_exists>false</container_already_exists>
|
||||
<container_already_exists>true</container_already_exists>
|
||||
<skip_access_check>false</skip_access_check>
|
||||
<!-- default credentials for Azurite storage account -->
|
||||
<account_name>devstoreaccount1</account_name>
|
||||
@ -16,6 +16,12 @@
|
||||
<type>local</type>
|
||||
<path>/</path>
|
||||
</hdd>
|
||||
<cache_on_jbod>
|
||||
<type>cache</type>
|
||||
<disk>blob_storage_disk</disk>
|
||||
<path>/jbod1/</path>
|
||||
<max_size>1000000000</max_size>
|
||||
</cache_on_jbod>
|
||||
</disks>
|
||||
<policies>
|
||||
<blob_storage_policy>
|
||||
@ -28,6 +34,13 @@
|
||||
</external>
|
||||
</volumes>
|
||||
</blob_storage_policy>
|
||||
<cache_on_jbod>
|
||||
<volumes>
|
||||
<main>
|
||||
<disk>cache_on_jbod</disk>
|
||||
</main>
|
||||
</volumes>
|
||||
</cache_on_jbod>
|
||||
</policies>
|
||||
</storage_configuration>
|
||||
</clickhouse>
|
||||
|
@ -32,6 +32,17 @@ def cluster():
|
||||
],
|
||||
with_azurite=True,
|
||||
)
|
||||
cluster.add_instance(
|
||||
"node_with_limited_disk",
|
||||
main_configs=[
|
||||
"configs/config.d/storage_conf.xml",
|
||||
"configs/config.d/bg_processing_pool_conf.xml",
|
||||
],
|
||||
with_minio=True,
|
||||
tmpfs=[
|
||||
"/jbod1:size=2M",
|
||||
],
|
||||
)
|
||||
logging.info("Starting cluster...")
|
||||
cluster.start()
|
||||
logging.info("Cluster started")
|
||||
@ -574,3 +585,18 @@ def test_big_insert(cluster):
|
||||
f"INSERT INTO {TABLE_NAME} select '2020-01-03', number, toString(number) from numbers(5000000)",
|
||||
)
|
||||
assert int(azure_query(node, f"SELECT count() FROM {TABLE_NAME}")) == 5000000
|
||||
|
||||
|
||||
def test_cache_with_full_disk_space(cluster):
|
||||
node = cluster.instances["node_with_limited_disk"]
|
||||
settings = {"storage_policy":"cache_on_jbod"}
|
||||
create_table(node, TABLE_NAME, **settings)
|
||||
azure_query(
|
||||
node,
|
||||
f"INSERT INTO {TABLE_NAME} select '2020-01-03', number, toString(number) from numbers(500000)",
|
||||
)
|
||||
azure_query(node, f"SELECT * FROM {TABLE_NAME} WHERE value LIKE '%abc%' ORDER BY value FORMAT Null")
|
||||
|
||||
assert node.contains_in_log(
|
||||
"Insert into cache is skipped due to insufficient disk space"
|
||||
)
|
||||
|
@ -10,3 +10,9 @@ SELECT 2, * FROM test LIMIT 10 FORMAT Null; 0 1 0
|
||||
0
|
||||
SELECT 3, * FROM test LIMIT 10 FORMAT Null; 0 1 0
|
||||
|
||||
Using storage policy: azure_cache
|
||||
SELECT 1, * FROM test LIMIT 10 FORMAT Null; 1 0 1
|
||||
SELECT 2, * FROM test LIMIT 10 FORMAT Null; 0 1 0
|
||||
0
|
||||
SELECT 3, * FROM test LIMIT 10 FORMAT Null; 0 1 0
|
||||
|
||||
|
@ -11,7 +11,7 @@ TMP_PATH=${CLICKHOUSE_TEST_UNIQUE_NAME}
|
||||
QUERIES_FILE=02226_filesystem_cache_profile_events.queries
|
||||
TEST_FILE=$CUR_DIR/filesystem_cache_queries/$QUERIES_FILE
|
||||
|
||||
for storagePolicy in 's3_cache' 'local_cache'; do
|
||||
for storagePolicy in 's3_cache' 'local_cache' 'azure_cache'; do
|
||||
echo "Using storage policy: $storagePolicy"
|
||||
cat $TEST_FILE | sed -e "s/_storagePolicy/${storagePolicy}/" > $TMP_PATH
|
||||
${CLICKHOUSE_CLIENT} --queries-file $TMP_PATH
|
||||
|
@ -250,3 +250,129 @@ SELECT count() FROM test;
|
||||
SELECT count() FROM test WHERE value LIKE '%010%';
|
||||
18816
|
||||
|
||||
Using storage policy: azure_cache
|
||||
-- { echo }
|
||||
|
||||
SET enable_filesystem_cache_on_write_operations=1;
|
||||
DROP TABLE IF EXISTS test;
|
||||
CREATE TABLE test (key UInt32, value String) Engine=MergeTree() ORDER BY key SETTINGS storage_policy='azure_cache', min_bytes_for_wide_part = 10485760;
|
||||
SYSTEM STOP MERGES test;
|
||||
SYSTEM DROP FILESYSTEM CACHE;
|
||||
SELECT file_segment_range_begin, file_segment_range_end, size, state
|
||||
FROM
|
||||
(
|
||||
SELECT file_segment_range_begin, file_segment_range_end, size, state, local_path
|
||||
FROM
|
||||
(
|
||||
SELECT arrayJoin(cache_paths) AS cache_path, local_path, remote_path
|
||||
FROM system.remote_data_paths
|
||||
) AS data_paths
|
||||
INNER JOIN
|
||||
system.filesystem_cache AS caches
|
||||
ON data_paths.cache_path = caches.cache_path
|
||||
)
|
||||
WHERE endsWith(local_path, 'data.bin')
|
||||
FORMAT Vertical;
|
||||
SELECT count() FROM (SELECT arrayJoin(cache_paths) AS cache_path, local_path, remote_path FROM system.remote_data_paths ) AS data_paths INNER JOIN system.filesystem_cache AS caches ON data_paths.cache_path = caches.cache_path;
|
||||
0
|
||||
SELECT count() FROM system.filesystem_cache;
|
||||
0
|
||||
INSERT INTO test SELECT number, toString(number) FROM numbers(100);
|
||||
SELECT file_segment_range_begin, file_segment_range_end, size, state
|
||||
FROM
|
||||
(
|
||||
SELECT file_segment_range_begin, file_segment_range_end, size, state, local_path
|
||||
FROM
|
||||
(
|
||||
SELECT arrayJoin(cache_paths) AS cache_path, local_path, remote_path
|
||||
FROM system.remote_data_paths
|
||||
) AS data_paths
|
||||
INNER JOIN
|
||||
system.filesystem_cache AS caches
|
||||
ON data_paths.cache_path = caches.cache_path
|
||||
)
|
||||
WHERE endsWith(local_path, 'data.bin')
|
||||
FORMAT Vertical;
|
||||
Row 1:
|
||||
──────
|
||||
file_segment_range_begin: 0
|
||||
file_segment_range_end: 745
|
||||
size: 746
|
||||
state: DOWNLOADED
|
||||
SELECT count() FROM (SELECT arrayJoin(cache_paths) AS cache_path, local_path, remote_path FROM system.remote_data_paths ) AS data_paths INNER JOIN system.filesystem_cache AS caches ON data_paths.cache_path = caches.cache_path;
|
||||
7
|
||||
SELECT count() FROM system.filesystem_cache;
|
||||
7
|
||||
SELECT count() FROM system.filesystem_cache WHERE cache_hits > 0;
|
||||
0
|
||||
SELECT * FROM test FORMAT Null;
|
||||
SELECT count() FROM system.filesystem_cache WHERE cache_hits > 0;
|
||||
2
|
||||
SELECT * FROM test FORMAT Null;
|
||||
SELECT count() FROM system.filesystem_cache WHERE cache_hits > 0;
|
||||
2
|
||||
SELECT count() size FROM system.filesystem_cache;
|
||||
7
|
||||
SYSTEM DROP FILESYSTEM CACHE;
|
||||
INSERT INTO test SELECT number, toString(number) FROM numbers(100, 200);
|
||||
SELECT file_segment_range_begin, file_segment_range_end, size, state
|
||||
FROM
|
||||
(
|
||||
SELECT file_segment_range_begin, file_segment_range_end, size, state, local_path
|
||||
FROM
|
||||
(
|
||||
SELECT arrayJoin(cache_paths) AS cache_path, local_path, remote_path
|
||||
FROM system.remote_data_paths
|
||||
) AS data_paths
|
||||
INNER JOIN
|
||||
system.filesystem_cache AS caches
|
||||
ON data_paths.cache_path = caches.cache_path
|
||||
)
|
||||
WHERE endsWith(local_path, 'data.bin')
|
||||
FORMAT Vertical;
|
||||
Row 1:
|
||||
──────
|
||||
file_segment_range_begin: 0
|
||||
file_segment_range_end: 1659
|
||||
size: 1660
|
||||
state: DOWNLOADED
|
||||
SELECT count() FROM (SELECT arrayJoin(cache_paths) AS cache_path, local_path, remote_path FROM system.remote_data_paths ) AS data_paths INNER JOIN system.filesystem_cache AS caches ON data_paths.cache_path = caches.cache_path;
|
||||
7
|
||||
SELECT count() FROM system.filesystem_cache;
|
||||
7
|
||||
SELECT count() FROM system.filesystem_cache;
|
||||
7
|
||||
INSERT INTO test SELECT number, toString(number) FROM numbers(100) SETTINGS enable_filesystem_cache_on_write_operations=0;
|
||||
SELECT count() FROM system.filesystem_cache;
|
||||
7
|
||||
INSERT INTO test SELECT number, toString(number) FROM numbers(100);
|
||||
INSERT INTO test SELECT number, toString(number) FROM numbers(300, 10000);
|
||||
SELECT count() FROM system.filesystem_cache;
|
||||
21
|
||||
SYSTEM START MERGES test;
|
||||
OPTIMIZE TABLE test FINAL;
|
||||
SELECT count() FROM system.filesystem_cache;
|
||||
31
|
||||
SET mutations_sync=2;
|
||||
ALTER TABLE test UPDATE value = 'kek' WHERE key = 100;
|
||||
SELECT count() FROM system.filesystem_cache;
|
||||
38
|
||||
INSERT INTO test SELECT number, toString(number) FROM numbers(5000000);
|
||||
SYSTEM FLUSH LOGS;
|
||||
SELECT
|
||||
query, ProfileEvents['RemoteFSReadBytes'] > 0 as remote_fs_read
|
||||
FROM
|
||||
system.query_log
|
||||
WHERE
|
||||
query LIKE 'SELECT number, toString(number) FROM numbers(5000000)%'
|
||||
AND type = 'QueryFinish'
|
||||
AND current_database = currentDatabase()
|
||||
ORDER BY
|
||||
query_start_time
|
||||
DESC
|
||||
LIMIT 1;
|
||||
SELECT count() FROM test;
|
||||
5010500
|
||||
SELECT count() FROM test WHERE value LIKE '%010%';
|
||||
18816
|
||||
|
||||
|
@ -11,7 +11,7 @@ TMP_PATH=${CLICKHOUSE_TEST_UNIQUE_NAME}
|
||||
QUERIES_FILE=02241_filesystem_cache_on_write_operations.queries
|
||||
TEST_FILE=$CUR_DIR/filesystem_cache_queries/$QUERIES_FILE
|
||||
|
||||
for storagePolicy in 's3_cache' 'local_cache'; do
|
||||
for storagePolicy in 's3_cache' 'local_cache' 'azure_cache'; do
|
||||
echo "Using storage policy: $storagePolicy"
|
||||
cat $TEST_FILE | sed -e "s/_storagePolicy/${storagePolicy}/" > $TMP_PATH
|
||||
${CLICKHOUSE_CLIENT} --queries-file $TMP_PATH
|
||||
|
Loading…
Reference in New Issue
Block a user