Fix checks

This commit is contained in:
kssenii 2022-08-09 13:02:52 +02:00
parent ece37d2bd9
commit 0dda03c94b
10 changed files with 55 additions and 159 deletions

View File

@ -317,6 +317,7 @@ else
rm -f /etc/clickhouse-server/users.d/database_ordinary.xml ||:
# Remove s3 related configs to avoid "there is no disk type `cache`"
rm -f /etc/clickhouse-server/config.d/storage_conf.xml ||:
rm -f /etc/clickhouse-server/config.d/azure_storage_conf.xml ||:
start

View File

@ -16,6 +16,18 @@ namespace ProfileEvents
namespace DB
{
class SwapHelper
{
public:
SwapHelper(WriteBuffer & b1_, WriteBuffer & b2_) : b1(b1_), b2(b2_) { b1.swap(b2); }
~SwapHelper() { b1.swap(b2); }
private:
WriteBuffer & b1;
WriteBuffer & b2;
};
CachedOnDiskWriteBufferFromFile::CachedOnDiskWriteBufferFromFile(
std::unique_ptr<WriteBuffer> impl_,
FileCachePtr cache_,
@ -39,10 +51,10 @@ CachedOnDiskWriteBufferFromFile::CachedOnDiskWriteBufferFromFile(
void CachedOnDiskWriteBufferFromFile::nextImpl()
{
size_t size = offset();
swap(*impl);
try
{
SwapHelper swap(*this, *impl);
/// Write data to the underlying buffer.
impl->next();
}
@ -55,8 +67,6 @@ void CachedOnDiskWriteBufferFromFile::nextImpl()
throw;
}
swap(*impl);
/// Write data to cache.
cacheData(working_buffer.begin(), size);
current_download_offset += size;
@ -133,8 +143,26 @@ void CachedOnDiskWriteBufferFromFile::appendFilesystemCacheLog(const FileSegment
}
}
void CachedOnDiskWriteBufferFromFile::preFinalize()
void CachedOnDiskWriteBufferFromFile::finalizeImpl()
{
try
{
SwapHelper swap(*this, *impl);
impl->finalize();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
if (cache_writer)
{
cache_writer->finalize();
cache_writer.reset();
}
throw;
}
if (cache_writer)
{
cache_writer->finalize();
@ -142,24 +170,4 @@ void CachedOnDiskWriteBufferFromFile::preFinalize()
}
}
/// void CachedOnDiskWriteBufferFromFile::finalizeImpl()
/// {
/// // try
/// // {
/// // next();
/// // }
/// // catch (...)
/// // {
/// // tryLogCurrentException(__PRETTY_FUNCTION__);
///
/// // if (cache_writer)
/// // cache_writer->finalize();
///
/// // throw;
/// // }
///
/// if (cache_writer)
/// cache_writer->finalize();
/// }
}

View File

@ -29,8 +29,7 @@ public:
void nextImpl() override;
void preFinalize() override;
// void finalizeImpl() override;
void finalizeImpl() override;
private:
void cacheData(char * data, size_t size);

View File

@ -9,6 +9,7 @@
#include <Disks/IO/ReadBufferFromRemoteFSGather.h>
#include <Disks/ObjectStorages/AzureBlobStorage/AzureBlobStorageAuth.h>
#include <Common/logger_useful.h>
namespace DB
@ -29,6 +30,7 @@ AzureObjectStorage::AzureObjectStorage(
: name(name_)
, client(std::move(client_))
, settings(std::move(settings_))
, log(&Poco::Logger::get("AzureObjectStorage"))
{
}
@ -123,6 +125,8 @@ std::unique_ptr<WriteBufferFromFileBase> AzureObjectStorage::writeObject( /// NO
if (mode != WriteMode::Rewrite)
throw Exception("Azure storage doesn't support append", ErrorCodes::UNSUPPORTED_METHOD);
LOG_TEST(log, "Writing file: {}", object.absolute_path);
auto buffer = std::make_unique<WriteBufferFromAzureBlobStorage>(
client.get(),
object.absolute_path,
@ -151,6 +155,7 @@ void AzureObjectStorage::listPrefix(const std::string & path, RelativePathsWithS
void AzureObjectStorage::removeObject(const StoredObject & object)
{
const auto & path = object.absolute_path;
LOG_TEST(log, "Removing single object: {}", path);
auto client_ptr = client.get();
auto delete_info = client_ptr->DeleteBlob(path);
if (!delete_info.Value.Deleted)
@ -162,6 +167,7 @@ void AzureObjectStorage::removeObjects(const StoredObjects & objects)
auto client_ptr = client.get();
for (const auto & object : objects)
{
LOG_TEST(log, "Removing object: {} (total: {})", object.absolute_path, objects.size());
auto delete_info = client_ptr->DeleteBlob(object.absolute_path);
if (!delete_info.Value.Deleted)
throw Exception(ErrorCodes::AZURE_BLOB_STORAGE_ERROR, "Failed to delete file in AzureBlob Storage: {}", object.absolute_path);

View File

@ -16,6 +16,11 @@
#include <azure/storage/blobs.hpp>
#endif
namespace Poco
{
class Logger;
}
namespace DB
{
@ -122,6 +127,8 @@ private:
/// client used to access the files in the Blob Storage cloud
MultiVersion<Azure::Storage::Blobs::BlobContainerClient> client;
MultiVersion<AzureObjectStorageSettings> settings;
Poco::Logger * log;
};
}

View File

@ -66,10 +66,10 @@ void CachedObjectStorage::startup()
bool CachedObjectStorage::exists(const StoredObject & object) const
{
fs::path cache_path = getCachePath(object.getPathKeyForCache());
// fs::path cache_path = getCachePath(object.getPathKeyForCache());
if (fs::exists(cache_path) && !cache_path.empty())
return true;
// if (fs::exists(cache_path) && !cache_path.empty())
// return true;
return object_storage->exists(object);
}

View File

@ -1019,7 +1019,7 @@ void DatabaseCatalog::dropTableFinally(const TableMarkedAsDropped & table)
for (const auto & [disk_name, disk] : getContext()->getDisksMap())
{
String data_path = "store/" + getPathForUUID(table.table_id.uuid);
if (!disk->exists(data_path))
if (!disk->exists(data_path) || disk->isReadOnly())
continue;
LOG_INFO(log, "Removing data directory {} of dropped table {} from disk {}", data_path, table.table_id.getNameForLogs(), disk_name);

View File

@ -499,9 +499,10 @@ void IMergeTreeDataPart::removeIfNeeded()
if (!is_temp && state != State::DeleteOnDestroy)
return;
std::string path;
try
{
auto path = data_part_storage->getRelativePath();
path = data_part_storage->getRelativePath();
if (!data_part_storage->exists()) // path
return;
@ -537,7 +538,7 @@ void IMergeTreeDataPart::removeIfNeeded()
/// If it's tmp_merge_<part_name> or tmp_fetch_<part_name>,
/// then all future attempts to execute part producing operation will fail with "directory already exists".
/// Seems like it's especially important for remote disks, because removal may fail due to network issues.
tryLogCurrentException(__PRETTY_FUNCTION__);
tryLogCurrentException(__PRETTY_FUNCTION__, "while removiong path: " + path);
assert(!is_temp);
assert(state != State::DeleteOnDestroy);
assert(state != State::Temporary);

View File

@ -250,129 +250,3 @@ SELECT count() FROM test;
SELECT count() FROM test WHERE value LIKE '%010%';
18816
Using storage policy: azure_cache
-- { echo }
SET enable_filesystem_cache_on_write_operations=1;
DROP TABLE IF EXISTS test;
CREATE TABLE test (key UInt32, value String) Engine=MergeTree() ORDER BY key SETTINGS storage_policy='azure_cache', min_bytes_for_wide_part = 10485760;
SYSTEM STOP MERGES test;
SYSTEM DROP FILESYSTEM CACHE;
SELECT file_segment_range_begin, file_segment_range_end, size, state
FROM
(
SELECT file_segment_range_begin, file_segment_range_end, size, state, local_path
FROM
(
SELECT arrayJoin(cache_paths) AS cache_path, local_path, remote_path
FROM system.remote_data_paths
) AS data_paths
INNER JOIN
system.filesystem_cache AS caches
ON data_paths.cache_path = caches.cache_path
)
WHERE endsWith(local_path, 'data.bin')
FORMAT Vertical;
SELECT count() FROM (SELECT arrayJoin(cache_paths) AS cache_path, local_path, remote_path FROM system.remote_data_paths ) AS data_paths INNER JOIN system.filesystem_cache AS caches ON data_paths.cache_path = caches.cache_path;
0
SELECT count() FROM system.filesystem_cache;
0
INSERT INTO test SELECT number, toString(number) FROM numbers(100);
SELECT file_segment_range_begin, file_segment_range_end, size, state
FROM
(
SELECT file_segment_range_begin, file_segment_range_end, size, state, local_path
FROM
(
SELECT arrayJoin(cache_paths) AS cache_path, local_path, remote_path
FROM system.remote_data_paths
) AS data_paths
INNER JOIN
system.filesystem_cache AS caches
ON data_paths.cache_path = caches.cache_path
)
WHERE endsWith(local_path, 'data.bin')
FORMAT Vertical;
Row 1:
──────
file_segment_range_begin: 0
file_segment_range_end: 745
size: 746
state: DOWNLOADED
SELECT count() FROM (SELECT arrayJoin(cache_paths) AS cache_path, local_path, remote_path FROM system.remote_data_paths ) AS data_paths INNER JOIN system.filesystem_cache AS caches ON data_paths.cache_path = caches.cache_path;
7
SELECT count() FROM system.filesystem_cache;
7
SELECT count() FROM system.filesystem_cache WHERE cache_hits > 0;
0
SELECT * FROM test FORMAT Null;
SELECT count() FROM system.filesystem_cache WHERE cache_hits > 0;
2
SELECT * FROM test FORMAT Null;
SELECT count() FROM system.filesystem_cache WHERE cache_hits > 0;
2
SELECT count() size FROM system.filesystem_cache;
7
SYSTEM DROP FILESYSTEM CACHE;
INSERT INTO test SELECT number, toString(number) FROM numbers(100, 200);
SELECT file_segment_range_begin, file_segment_range_end, size, state
FROM
(
SELECT file_segment_range_begin, file_segment_range_end, size, state, local_path
FROM
(
SELECT arrayJoin(cache_paths) AS cache_path, local_path, remote_path
FROM system.remote_data_paths
) AS data_paths
INNER JOIN
system.filesystem_cache AS caches
ON data_paths.cache_path = caches.cache_path
)
WHERE endsWith(local_path, 'data.bin')
FORMAT Vertical;
Row 1:
──────
file_segment_range_begin: 0
file_segment_range_end: 1659
size: 1660
state: DOWNLOADED
SELECT count() FROM (SELECT arrayJoin(cache_paths) AS cache_path, local_path, remote_path FROM system.remote_data_paths ) AS data_paths INNER JOIN system.filesystem_cache AS caches ON data_paths.cache_path = caches.cache_path;
7
SELECT count() FROM system.filesystem_cache;
7
SELECT count() FROM system.filesystem_cache;
7
INSERT INTO test SELECT number, toString(number) FROM numbers(100) SETTINGS enable_filesystem_cache_on_write_operations=0;
SELECT count() FROM system.filesystem_cache;
7
INSERT INTO test SELECT number, toString(number) FROM numbers(100);
INSERT INTO test SELECT number, toString(number) FROM numbers(300, 10000);
SELECT count() FROM system.filesystem_cache;
21
SYSTEM START MERGES test;
OPTIMIZE TABLE test FINAL;
SELECT count() FROM system.filesystem_cache;
31
SET mutations_sync=2;
ALTER TABLE test UPDATE value = 'kek' WHERE key = 100;
SELECT count() FROM system.filesystem_cache;
38
INSERT INTO test SELECT number, toString(number) FROM numbers(5000000);
SYSTEM FLUSH LOGS;
SELECT
query, ProfileEvents['RemoteFSReadBytes'] > 0 as remote_fs_read
FROM
system.query_log
WHERE
query LIKE 'SELECT number, toString(number) FROM numbers(5000000)%'
AND type = 'QueryFinish'
AND current_database = currentDatabase()
ORDER BY
query_start_time
DESC
LIMIT 1;
SELECT count() FROM test;
5010500
SELECT count() FROM test WHERE value LIKE '%010%';
18816

View File

@ -11,7 +11,7 @@ TMP_PATH=${CLICKHOUSE_TEST_UNIQUE_NAME}
QUERIES_FILE=02241_filesystem_cache_on_write_operations.queries
TEST_FILE=$CUR_DIR/filesystem_cache_queries/$QUERIES_FILE
for storagePolicy in 's3_cache' 'local_cache' 'azure_cache'; do
for storagePolicy in 's3_cache' 'local_cache'; do
echo "Using storage policy: $storagePolicy"
cat $TEST_FILE | sed -e "s/_storagePolicy/${storagePolicy}/" > $TMP_PATH
${CLICKHOUSE_CLIENT} --queries-file $TMP_PATH