Merge branch 'master' into add-system-s3-queue-log-to-config

This commit is contained in:
Kseniia Sumarokova 2023-11-21 21:31:35 +01:00 committed by GitHub
commit 1dfafcfe5a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
69 changed files with 1200 additions and 102 deletions

2
contrib/grpc vendored

@ -1 +1 @@
Subproject commit 740e3dfd97301a52ad8165b65285bcc149d9e817
Subproject commit 77b2737a709d43d8c6895e3f03ca62b00bd9201c

View File

@ -2740,7 +2740,7 @@ ClickHouse will use it to form the proxy URI using the following template: `{pro
<proxy_cache_time>10</proxy_cache_time>
</resolver>
</http>
<https>
<resolver>
<endpoint>http://resolver:8080/hostname</endpoint>

View File

@ -0,0 +1,59 @@
---
slug: /en/operations/system-tables/blob_storage_log
---
# Blob Storage Operations Log
Contains logging entries with information about various blob storage operations such as uploads and deletes.
Columns:
- `event_date` ([Date](../../sql-reference/data-types/date.md)) — Date of the event.
- `event_time` ([DateTime](../../sql-reference/data-types/datetime.md)) — Time of the event.
- `event_time_microseconds` ([DateTime64](../../sql-reference/data-types/datetime64.md)) — Time of the event with microseconds precision.
- `event_type` ([Enum8](../../sql-reference/data-types/enum.md)) — Type of the event. Possible values:
- `'Upload'`
- `'Delete'`
- `'MultiPartUploadCreate'`
- `'MultiPartUploadWrite'`
- `'MultiPartUploadComplete'`
- `'MultiPartUploadAbort'`
- `query_id` ([String](../../sql-reference/data-types/string.md)) — Identifier of the query associated with the event, if any.
- `thread_id` ([UInt64](../../sql-reference/data-types/int-uint.md#uint-ranges)) — Identifier of the thread performing the operation.
- `thread_name` ([String](../../sql-reference/data-types/string.md)) — Name of the thread performing the operation.
- `disk_name` ([LowCardinality(String)](../../sql-reference/data-types/lowcardinality.md)) — Name of the associated disk.
- `bucket` ([String](../../sql-reference/data-types/string.md)) — Name of the bucket.
- `remote_path` ([String](../../sql-reference/data-types/string.md)) — Path to the remote resource.
- `local_path` ([String](../../sql-reference/data-types/string.md)) — Path to the metadata file on the local system, which references the remote resource.
- `data_size` ([UInt32](../../sql-reference/data-types/int-uint.md#uint-ranges)) — Size of the data involved in the upload event.
- `error` ([String](../../sql-reference/data-types/string.md)) — Error message associated with the event, if any.
**Example**
Suppose a blob storage operation uploads a file, and an event is logged:
```sql
SELECT * FROM system.blob_storage_log WHERE query_id = '7afe0450-504d-4e4b-9a80-cd9826047972' ORDER BY event_date, event_time_microseconds \G
```
```text
Row 1:
──────
event_date: 2023-10-31
event_time: 2023-10-31 16:03:40
event_time_microseconds: 2023-10-31 16:03:40.481437
event_type: Upload
query_id: 7afe0450-504d-4e4b-9a80-cd9826047972
thread_id: 2381740
disk_name: disk_s3
bucket: bucket1
remote_path: rrr/kxo/tbnqtrghgtnxkzgtcrlutwuslgawe
local_path: store/654/6549e8b3-d753-4447-8047-d462df6e6dbe/tmp_insert_all_1_1_0/checksums.txt
data_size: 259
error:
```
In this example, upload operation was associated with the `INSERT` query with ID `7afe0450-504d-4e4b-9a80-cd9826047972`. The local metadata file `store/654/6549e8b3-d753-4447-8047-d462df6e6dbe/tmp_insert_all_1_1_0/checksums.txt` refers to remote path `rrr/kxo/tbnqtrghgtnxkzgtcrlutwuslgawe` in bucket `bucket1` on disk `disk_s3`, with a size of 259 bytes.
**See Also**
- [External Disks for Storing Data](../../operations/storing-data.md)

View File

@ -439,7 +439,7 @@ concat(s1, s2, ...)
**Arguments**
At least two values of arbitrary type.
At least one value of arbitrary type.
Arguments which are not of types [String](../../sql-reference/data-types/string.md) or [FixedString](../../sql-reference/data-types/fixedstring.md) are converted to strings using their default serialization. As this decreases performance, it is not recommended to use non-String/FixedString arguments.

View File

@ -1257,6 +1257,16 @@
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
</s3queue_log>
<!-- Blob storage object operations log.
-->
<blob_storage_log>
<database>system</database>
<table>blob_storage_log</table>
<partition_by>toYYYYMM(event_date)</partition_by>
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
<ttl>event_date + INTERVAL 30 DAY</ttl>
</blob_storage_log>
<!-- <top_level_domains_path>/var/lib/clickhouse/top_level_domains/</top_level_domains_path> -->
<!-- Custom TLD lists.
Format: <name>/path/to/file</name>

View File

@ -5,11 +5,214 @@
#include <Analyzer/InDepthQueryTreeVisitor.h>
#include <Analyzer/FunctionNode.h>
#include <Analyzer/ConstantNode.h>
#include <Analyzer/JoinNode.h>
#include <Analyzer/HashUtils.h>
#include <Analyzer/Utils.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
/// Visitor that optimizes logical expressions _only_ in JOIN ON section
class JoinOnLogicalExpressionOptimizerVisitor : public InDepthQueryTreeVisitorWithContext<JoinOnLogicalExpressionOptimizerVisitor>
{
public:
using Base = InDepthQueryTreeVisitorWithContext<JoinOnLogicalExpressionOptimizerVisitor>;
explicit JoinOnLogicalExpressionOptimizerVisitor(ContextPtr context)
: Base(std::move(context))
{}
void enterImpl(QueryTreeNodePtr & node)
{
auto * function_node = node->as<FunctionNode>();
if (!function_node)
return;
if (function_node->getFunctionName() == "or")
{
bool is_argument_type_changed = tryOptimizeIsNotDistinctOrIsNull(node, getContext());
if (is_argument_type_changed)
need_rerun_resolve = true;
return;
}
}
void leaveImpl(QueryTreeNodePtr & node)
{
if (!need_rerun_resolve)
return;
if (auto * function_node = node->as<FunctionNode>())
rerunFunctionResolve(function_node, getContext());
}
private:
bool need_rerun_resolve = false;
/// Returns true if type of some operand is changed and parent function needs to be re-resolved
static bool tryOptimizeIsNotDistinctOrIsNull(QueryTreeNodePtr & node, const ContextPtr & context)
{
auto & function_node = node->as<FunctionNode &>();
chassert(function_node.getFunctionName() == "or");
QueryTreeNodes or_operands;
or_operands.reserve(function_node.getArguments().getNodes().size());
/// Indices of `equals` or `isNotDistinctFrom` functions in the vector above
std::vector<size_t> equals_functions_indices;
/** Map from `isNull` argument to indices of operands that contains that `isNull` functions
* `a = b OR (a IS NULL AND b IS NULL) OR (a IS NULL AND c IS NULL)`
* will be mapped to
* {
* a => [(a IS NULL AND b IS NULL), (a IS NULL AND c IS NULL)]
* b => [(a IS NULL AND b IS NULL)]
* c => [(a IS NULL AND c IS NULL)]
* }
* Then for each a <=> b we can find all operands that contains both a IS NULL and b IS NULL
*/
QueryTreeNodePtrWithHashMap<std::vector<size_t>> is_null_argument_to_indices;
for (const auto & argument : function_node.getArguments())
{
or_operands.push_back(argument);
auto * argument_function = argument->as<FunctionNode>();
if (!argument_function)
continue;
const auto & func_name = argument_function->getFunctionName();
if (func_name == "equals" || func_name == "isNotDistinctFrom")
{
equals_functions_indices.push_back(or_operands.size() - 1);
}
else if (func_name == "and")
{
for (const auto & and_argument : argument_function->getArguments().getNodes())
{
auto * and_argument_function = and_argument->as<FunctionNode>();
if (and_argument_function && and_argument_function->getFunctionName() == "isNull")
{
const auto & is_null_argument = and_argument_function->getArguments().getNodes()[0];
is_null_argument_to_indices[is_null_argument].push_back(or_operands.size() - 1);
}
}
}
}
/// OR operands that are changed to and needs to be re-resolved
std::unordered_set<size_t> arguments_to_reresolve;
for (size_t equals_function_idx : equals_functions_indices)
{
auto * equals_function = or_operands[equals_function_idx]->as<FunctionNode>();
/// For a <=> b we are looking for expressions containing both `a IS NULL` and `b IS NULL` combined with AND
const auto & argument_nodes = equals_function->getArguments().getNodes();
const auto & lhs_is_null_parents = is_null_argument_to_indices[argument_nodes[0]];
const auto & rhs_is_null_parents = is_null_argument_to_indices[argument_nodes[1]];
std::unordered_set<size_t> operands_to_optimize;
std::set_intersection(lhs_is_null_parents.begin(), lhs_is_null_parents.end(),
rhs_is_null_parents.begin(), rhs_is_null_parents.end(),
std::inserter(operands_to_optimize, operands_to_optimize.begin()));
/// If we have `a = b OR (a IS NULL AND b IS NULL)` we can optimize it to `a <=> b`
if (!operands_to_optimize.empty() && equals_function->getFunctionName() == "equals")
arguments_to_reresolve.insert(equals_function_idx);
for (size_t to_optimize_idx : operands_to_optimize)
{
/// We are looking for operand `a IS NULL AND b IS NULL AND ...`
auto * operand_to_optimize = or_operands[to_optimize_idx]->as<FunctionNode>();
/// Remove `a IS NULL` and `b IS NULL` arguments from AND
QueryTreeNodes new_arguments;
for (const auto & and_argument : operand_to_optimize->getArguments().getNodes())
{
bool to_eliminate = false;
const auto * and_argument_function = and_argument->as<FunctionNode>();
if (and_argument_function && and_argument_function->getFunctionName() == "isNull")
{
const auto & is_null_argument = and_argument_function->getArguments().getNodes()[0];
to_eliminate = (is_null_argument->isEqual(*argument_nodes[0]) || is_null_argument->isEqual(*argument_nodes[1]));
}
if (to_eliminate)
arguments_to_reresolve.insert(to_optimize_idx);
else
new_arguments.emplace_back(and_argument);
}
/// If less than two arguments left, we will remove or replace the whole AND below
operand_to_optimize->getArguments().getNodes() = std::move(new_arguments);
}
}
if (arguments_to_reresolve.empty())
/// Nothing have been changed
return false;
auto and_function_resolver = FunctionFactory::instance().get("and", context);
auto strict_equals_function_resolver = FunctionFactory::instance().get("isNotDistinctFrom", context);
bool need_reresolve = false;
QueryTreeNodes new_or_operands;
for (size_t i = 0; i < or_operands.size(); ++i)
{
if (arguments_to_reresolve.contains(i))
{
auto * function = or_operands[i]->as<FunctionNode>();
if (function->getFunctionName() == "equals")
{
/// We should replace `a = b` with `a <=> b` because we removed checks for IS NULL
need_reresolve |= function->getResultType()->isNullable();
function->resolveAsFunction(strict_equals_function_resolver);
new_or_operands.emplace_back(std::move(or_operands[i]));
}
else if (function->getFunctionName() == "and")
{
const auto & and_arguments = function->getArguments().getNodes();
if (and_arguments.size() > 1)
{
function->resolveAsFunction(and_function_resolver);
new_or_operands.emplace_back(std::move(or_operands[i]));
}
else if (and_arguments.size() == 1)
{
/// Replace AND with a single argument with the argument itself
new_or_operands.emplace_back(and_arguments[0]);
}
}
else
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected function name: '{}'", function->getFunctionName());
}
else
{
new_or_operands.emplace_back(std::move(or_operands[i]));
}
}
if (new_or_operands.size() == 1)
{
node = std::move(new_or_operands[0]);
return need_reresolve;
}
/// Rebuild OR function
auto or_function_resolver = FunctionFactory::instance().get("or", context);
function_node.getArguments().getNodes() = std::move(new_or_operands);
function_node.resolveAsFunction(or_function_resolver);
return need_reresolve;
}
};
class LogicalExpressionOptimizerVisitor : public InDepthQueryTreeVisitorWithContext<LogicalExpressionOptimizerVisitor>
{
public:
@ -21,6 +224,17 @@ public:
void enterImpl(QueryTreeNodePtr & node)
{
if (auto * join_node = node->as<JoinNode>())
{
/// Operator <=> is not supported outside of JOIN ON section
if (join_node->hasJoinExpression())
{
JoinOnLogicalExpressionOptimizerVisitor join_on_visitor(getContext());
join_on_visitor.visit(join_node->getJoinExpression());
}
return;
}
auto * function_node = node->as<FunctionNode>();
if (!function_node)
@ -38,6 +252,7 @@ public:
return;
}
}
private:
void tryReplaceAndEqualsChainsWithConstant(QueryTreeNodePtr & node)
{

View File

@ -67,6 +67,17 @@ namespace DB
* FROM TABLE
* WHERE a = 1 AND b = 'test';
* -------------------------------
*
* 5. Remove unnecessary IS NULL checks in JOIN ON clause
* - equality check with explicit IS NULL check replaced with <=> operator
* -------------------------------
* SELECT * FROM t1 JOIN t2 ON a = b OR (a IS NULL AND b IS NULL)
* SELECT * FROM t1 JOIN t2 ON a <=> b OR (a IS NULL AND b IS NULL)
*
* will be transformed into
*
* SELECT * FROM t1 JOIN t2 ON a <=> b
* -------------------------------
*/
class LogicalExpressionOptimizerPass final : public IQueryTreePass

View File

@ -127,6 +127,9 @@ BackupReaderS3::BackupReaderS3(
request_settings.max_single_read_retries = context_->getSettingsRef().s3_max_single_read_retries; // FIXME: Avoid taking value for endpoint
request_settings.allow_native_copy = allow_s3_native_copy;
client = makeS3Client(s3_uri_, access_key_id_, secret_access_key_, s3_settings, context_);
if (auto blob_storage_system_log = context_->getBlobStorageLog())
blob_storage_log = std::make_shared<BlobStorageLogWriter>(blob_storage_system_log);
}
BackupReaderS3::~BackupReaderS3() = default;
@ -178,6 +181,7 @@ void BackupReaderS3::copyFileToDisk(const String & path_in_backup, size_t file_s
/* dest_key= */ blob_path[0],
s3_settings.request_settings,
read_settings,
blob_storage_log,
object_attributes,
threadPoolCallbackRunner<void>(getBackupsIOThreadPool().get(), "BackupReaderS3"),
/* for_disk_s3= */ true);
@ -214,6 +218,12 @@ BackupWriterS3::BackupWriterS3(
request_settings.allow_native_copy = allow_s3_native_copy;
request_settings.setStorageClassName(storage_class_name);
client = makeS3Client(s3_uri_, access_key_id_, secret_access_key_, s3_settings, context_);
if (auto blob_storage_system_log = context_->getBlobStorageLog())
{
blob_storage_log = std::make_shared<BlobStorageLogWriter>(blob_storage_system_log);
if (context_->hasQueryContext())
blob_storage_log->query_id = context_->getQueryContext()->getCurrentQueryId();
}
}
void BackupWriterS3::copyFileFromDisk(const String & path_in_backup, DiskPtr src_disk, const String & src_path,
@ -239,6 +249,7 @@ void BackupWriterS3::copyFileFromDisk(const String & path_in_backup, DiskPtr src
fs::path(s3_uri.key) / path_in_backup,
s3_settings.request_settings,
read_settings,
blob_storage_log,
{},
threadPoolCallbackRunner<void>(getBackupsIOThreadPool().get(), "BackupWriterS3"));
return; /// copied!
@ -262,13 +273,15 @@ void BackupWriterS3::copyFile(const String & destination, const String & source,
fs::path(s3_uri.key) / destination,
s3_settings.request_settings,
read_settings,
blob_storage_log,
{},
threadPoolCallbackRunner<void>(getBackupsIOThreadPool().get(), "BackupWriterS3"));
}
void BackupWriterS3::copyDataToFile(const String & path_in_backup, const CreateReadBufferFunction & create_read_buffer, UInt64 start_pos, UInt64 length)
{
copyDataToS3File(create_read_buffer, start_pos, length, client, s3_uri.bucket, fs::path(s3_uri.key) / path_in_backup, s3_settings.request_settings, {},
copyDataToS3File(create_read_buffer, start_pos, length, client, s3_uri.bucket, fs::path(s3_uri.key) / path_in_backup,
s3_settings.request_settings, blob_storage_log, {},
threadPoolCallbackRunner<void>(getBackupsIOThreadPool().get(), "BackupWriterS3"));
}
@ -302,6 +315,7 @@ std::unique_ptr<WriteBuffer> BackupWriterS3::writeFile(const String & file_name)
fs::path(s3_uri.key) / file_name,
DBMS_DEFAULT_BUFFER_SIZE,
s3_settings.request_settings,
blob_storage_log,
std::nullopt,
threadPoolCallbackRunner<void>(getBackupsIOThreadPool().get(), "BackupWriterS3"),
write_settings);
@ -311,8 +325,19 @@ void BackupWriterS3::removeFile(const String & file_name)
{
S3::DeleteObjectRequest request;
request.SetBucket(s3_uri.bucket);
request.SetKey(fs::path(s3_uri.key) / file_name);
auto key = fs::path(s3_uri.key) / file_name;
request.SetKey(key);
auto outcome = client->DeleteObject(request);
if (blob_storage_log)
{
blob_storage_log->addEvent(
BlobStorageLogElement::EventType::Delete,
s3_uri.bucket, key, /* local_path */ "", /* data_size */ 0,
outcome.IsSuccess() ? nullptr : &outcome.GetError());
}
if (!outcome.IsSuccess() && !isNotFoundError(outcome.GetError().GetErrorType()))
throw S3Exception(outcome.GetError().GetMessage(), outcome.GetError().GetErrorType());
}
@ -371,6 +396,16 @@ void BackupWriterS3::removeFilesBatch(const Strings & file_names)
request.SetDelete(delkeys);
auto outcome = client->DeleteObjects(request);
if (blob_storage_log)
{
const auto * outcome_error = outcome.IsSuccess() ? nullptr : &outcome.GetError();
auto time_now = std::chrono::system_clock::now();
for (const auto & obj : current_chunk)
blob_storage_log->addEvent(BlobStorageLogElement::EventType::Delete, s3_uri.bucket, obj.GetKey(),
/* local_path */ "", /* data_size */ 0, outcome_error, time_now);
}
if (!outcome.IsSuccess() && !isNotFoundError(outcome.GetError().GetErrorType()))
throw S3Exception(outcome.GetError().GetMessage(), outcome.GetError().GetErrorType());
}

View File

@ -8,7 +8,7 @@
#include <IO/S3Common.h>
#include <Storages/StorageS3Settings.h>
#include <Interpreters/Context_fwd.h>
#include <IO/S3/BlobStorageLogWriter.h>
namespace DB
{
@ -32,6 +32,8 @@ private:
const DataSourceDescription data_source_description;
S3Settings s3_settings;
std::shared_ptr<S3::Client> client;
BlobStorageLogWriterPtr blob_storage_log;
};
@ -63,6 +65,8 @@ private:
S3Settings s3_settings;
std::shared_ptr<S3::Client> client;
std::optional<bool> supports_batch_delete;
BlobStorageLogWriterPtr blob_storage_log;
};
}

View File

@ -17,6 +17,7 @@
#include <Interpreters/TransactionsInfoLog.h>
#include <Interpreters/AsynchronousInsertLog.h>
#include <Interpreters/BackupLog.h>
#include <IO/S3/BlobStorageLogWriter.h>
#include <Common/MemoryTrackerBlockerInThread.h>
#include <Common/SystemLogBase.h>

View File

@ -31,7 +31,8 @@
M(FilesystemCacheLogElement) \
M(FilesystemReadPrefetchesLogElement) \
M(AsynchronousInsertLogElement) \
M(BackupLogElement)
M(BackupLogElement) \
M(BlobStorageLogElement)
namespace Poco
{

View File

@ -147,12 +147,14 @@ void KeeperSnapshotManagerS3::uploadSnapshotImpl(const SnapshotFileInfo & snapsh
const auto create_writer = [&](const auto & key)
{
/// blob_storage_log is not used for keeper
return WriteBufferFromS3(
s3_client->client,
s3_client->uri.bucket,
key,
DBMS_DEFAULT_BUFFER_SIZE,
request_settings_1
request_settings_1,
/* blob_log */ {}
);
};
@ -214,6 +216,7 @@ void KeeperSnapshotManagerS3::uploadSnapshotImpl(const SnapshotFileInfo & snapsh
delete_request.SetBucket(s3_client->uri.bucket);
delete_request.SetKey(lock_file);
auto delete_outcome = s3_client->client->DeleteObject(delete_request);
if (!delete_outcome.IsSuccess())
throw S3Exception(delete_outcome.GetError().GetMessage(), delete_outcome.GetError().GetErrorType());
}

View File

@ -235,6 +235,11 @@ std::shared_ptr<FilesystemReadPrefetchesLog> Context::getFilesystemReadPrefetche
return nullptr;
}
std::shared_ptr<BlobStorageLog> Context::getBlobStorageLog() const
{
return nullptr;
}
void Context::setConfig(const ConfigurationPtr & config)
{
auto lock = getGlobalLock();

View File

@ -27,6 +27,7 @@ struct ContextSharedPart;
class Macros;
class FilesystemCacheLog;
class FilesystemReadPrefetchesLog;
class BlobStorageLog;
/// A small class which owns ContextShared.
/// We don't use something like unique_ptr directly to allow ContextShared type to be incomplete.
@ -115,6 +116,7 @@ public:
std::shared_ptr<FilesystemCacheLog> getFilesystemCacheLog() const;
std::shared_ptr<FilesystemReadPrefetchesLog> getFilesystemReadPrefetchesLog() const;
std::shared_ptr<BlobStorageLog> getBlobStorageLog() const;
enum class ApplicationType
{

View File

@ -60,9 +60,9 @@ public:
void createDirectories(const String & path) override
{
auto tx = createEncryptedTransaction();
tx->createDirectories(path);
tx->commit();
auto wrapped_path = wrappedPath(path);
/// Delegate disk can have retry logic for recursive directory creation. Let it handle it.
delegate->createDirectories(wrapped_path);
}
void clearDirectory(const String & path) override

View File

@ -209,7 +209,7 @@ void ReadBufferFromRemoteFSGather::setReadUntilPosition(size_t position)
void ReadBufferFromRemoteFSGather::reset()
{
current_object = {};
current_object = StoredObject();
current_buf_idx = {};
current_buf.reset();
}

View File

@ -519,7 +519,7 @@ std::unique_ptr<ReadBufferFromFileBase> DiskObjectStorage::readFile(
return object_storage->readObjects(
storage_objects,
object_storage->getAdjustedSettingsFromMetadataFile(updateResourceLink(settings, getReadResourceName()), path),
updateResourceLink(settings, getReadResourceName()),
read_hint,
file_size);
}
@ -532,12 +532,9 @@ std::unique_ptr<WriteBufferFromFileBase> DiskObjectStorage::writeFile(
{
LOG_TEST(log, "Write file: {}", path);
WriteSettings write_settings = updateResourceLink(settings, getWriteResourceName());
auto transaction = createObjectStorageTransaction();
return transaction->writeFile(
path,
buf_size,
mode,
object_storage->getAdjustedSettingsFromMetadataFile(updateResourceLink(settings, getWriteResourceName()), path));
return transaction->writeFile(path, buf_size, mode, write_settings);
}
Strings DiskObjectStorage::getBlobPath(const String & path) const

View File

@ -684,7 +684,7 @@ std::unique_ptr<WriteBufferFromFileBase> DiskObjectStorageTransaction::writeFile
}
/// seems ok
auto object = StoredObject(object_key.serialize());
auto object = StoredObject(object_key.serialize(), path);
std::function<void(size_t count)> create_metadata_callback;
if (autocommit)
@ -782,7 +782,7 @@ void DiskObjectStorageTransaction::writeFileUsingBlobWritingFunction(
}
/// seems ok
auto object = StoredObject(object_key.serialize());
auto object = StoredObject(object_key.serialize(), path);
auto write_operation = std::make_unique<WriteFileObjectStorageOperation>(object_storage, metadata_storage, object);
operations_to_execute.emplace_back(std::move(write_operation));

View File

@ -206,10 +206,6 @@ public:
virtual bool supportParallelWrite() const { return false; }
virtual ReadSettings getAdjustedSettingsFromMetadataFile(const ReadSettings & settings, const std::string & /* path */) const { return settings; }
virtual WriteSettings getAdjustedSettingsFromMetadataFile(const WriteSettings & settings, const std::string & /* path */) const { return settings; }
virtual ReadSettings patchSettings(const ReadSettings & read_settings) const;
virtual WriteSettings patchSettings(const WriteSettings & write_settings) const;

View File

@ -141,7 +141,7 @@ StoredObjects MetadataStorageFromDisk::getStorageObjects(const std::string & pat
objects.reserve(keys_with_meta.size());
for (const auto & [object_key, object_meta] : keys_with_meta)
{
objects.emplace_back(object_key.serialize(), object_meta.size_bytes, path);
objects.emplace_back(object_key.serialize(), path, object_meta.size_bytes);
}
return objects;

View File

@ -106,7 +106,7 @@ StoredObjects MetadataStorageFromPlainObjectStorage::getStorageObjects(const std
{
size_t object_size = getFileSize(path);
auto object_key = object_storage->generateObjectKeyForPath(path);
return {StoredObject(object_key.serialize(), object_size, path)};
return {StoredObject(object_key.serialize(), path, object_size)};
}
const IMetadataStorage & MetadataStorageFromPlainObjectStorageTransaction::getStorageForNonTransactionalReads() const

View File

@ -15,6 +15,8 @@
#include <IO/S3/copyS3File.h>
#include <Interpreters/Context.h>
#include <Interpreters/threadPoolCallbackRunner.h>
#include <IO/S3/BlobStorageLogWriter.h>
#include <Disks/ObjectStorages/S3/diskSettings.h>
#include <Common/getRandomASCIIString.h>
@ -249,12 +251,18 @@ std::unique_ptr<WriteBufferFromFileBase> S3ObjectStorage::writeObject( /// NOLIN
if (write_settings.s3_allow_parallel_part_upload)
scheduler = threadPoolCallbackRunner<void>(getThreadPoolWriter(), "VFSWrite");
auto blob_storage_log = BlobStorageLogWriter::create(disk_name);
if (blob_storage_log)
blob_storage_log->local_path = object.local_path;
return std::make_unique<WriteBufferFromS3>(
client.get(),
bucket,
object.remote_path,
buf_size,
settings_ptr->request_settings,
std::move(blob_storage_log),
attributes,
std::move(scheduler),
disk_write_settings);
@ -321,6 +329,10 @@ void S3ObjectStorage::removeObjectImpl(const StoredObject & object, bool if_exis
request.SetBucket(bucket);
request.SetKey(object.remote_path);
auto outcome = client.get()->DeleteObject(request);
if (auto blob_storage_log = BlobStorageLogWriter::create(disk_name))
blob_storage_log->addEvent(BlobStorageLogElement::EventType::Delete,
bucket, object.remote_path, object.local_path, object.bytes_size,
outcome.IsSuccess() ? nullptr : &outcome.GetError());
throwIfUnexpectedError(outcome, if_exists);
@ -344,6 +356,7 @@ void S3ObjectStorage::removeObjectsImpl(const StoredObjects & objects, bool if_e
size_t chunk_size_limit = settings_ptr->objects_chunk_size_to_delete;
size_t current_position = 0;
auto blob_storage_log = BlobStorageLogWriter::create(disk_name);
while (current_position < objects.size())
{
std::vector<Aws::S3::Model::ObjectIdentifier> current_chunk;
@ -369,9 +382,18 @@ void S3ObjectStorage::removeObjectsImpl(const StoredObjects & objects, bool if_e
request.SetDelete(delkeys);
auto outcome = client.get()->DeleteObjects(request);
throwIfUnexpectedError(outcome, if_exists);
if (blob_storage_log)
{
const auto * outcome_error = outcome.IsSuccess() ? nullptr : &outcome.GetError();
auto time_now = std::chrono::system_clock::now();
for (const auto & object : objects)
blob_storage_log->addEvent(BlobStorageLogElement::EventType::Delete,
bucket, object.remote_path, object.local_path, object.bytes_size,
outcome_error, time_now);
}
LOG_DEBUG(log, "Objects with paths [{}] were removed from S3", keys);
throwIfUnexpectedError(outcome, if_exists);
}
}
}
@ -450,6 +472,7 @@ void S3ObjectStorage::copyObjectToAnotherObjectStorage( // NOLINT
object_to.remote_path,
settings_ptr->request_settings,
patchSettings(read_settings),
BlobStorageLogWriter::create(disk_name),
object_to_attributes,
scheduler,
/* for_disk_s3= */ true);
@ -478,6 +501,7 @@ void S3ObjectStorage::copyObject( // NOLINT
object_to.remote_path,
settings_ptr->request_settings,
patchSettings(read_settings),
BlobStorageLogWriter::create(disk_name),
object_to_attributes,
scheduler,
/* for_disk_s3= */ true);
@ -520,7 +544,7 @@ std::unique_ptr<IObjectStorage> S3ObjectStorage::cloneObjectStorage(
return std::make_unique<S3ObjectStorage>(
std::move(new_client), std::move(new_s3_settings),
version_id, s3_capabilities, new_namespace,
endpoint, object_key_prefix);
endpoint, object_key_prefix, disk_name);
}
ObjectStorageKey S3ObjectStorage::generateObjectKeyForPath(const std::string &) const

View File

@ -50,9 +50,11 @@ private:
const S3Capabilities & s3_capabilities_,
String bucket_,
String connection_string,
String object_key_prefix_)
String object_key_prefix_,
const String & disk_name_)
: bucket(std::move(bucket_))
, object_key_prefix(std::move(object_key_prefix_))
, disk_name(disk_name_)
, client(std::move(client_))
, s3_settings(std::move(s3_settings_))
, s3_capabilities(s3_capabilities_)
@ -173,7 +175,7 @@ private:
private:
std::string bucket;
String object_key_prefix;
std::string disk_name;
MultiVersion<S3::Client> client;
MultiVersion<S3ObjectStorageSettings> s3_settings;

View File

@ -116,6 +116,7 @@ void registerDiskS3(DiskFactory & factory, bool global_skip_access_check)
MetadataStoragePtr metadata_storage;
auto settings = getSettings(config, config_prefix, context);
auto client = getClient(config, config_prefix, context, *settings);
if (type == "s3_plain")
{
/// send_metadata changes the filenames (includes revision), while
@ -127,14 +128,18 @@ void registerDiskS3(DiskFactory & factory, bool global_skip_access_check)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "s3_plain does not supports send_metadata");
s3_storage = std::make_shared<S3PlainObjectStorage>(
std::move(client), std::move(settings), uri.version_id, s3_capabilities, uri.bucket, uri.endpoint, uri.key);
std::move(client), std::move(settings),
uri.version_id, s3_capabilities,
uri.bucket, uri.endpoint, uri.key, name);
metadata_storage = std::make_shared<MetadataStorageFromPlainObjectStorage>(s3_storage, uri.key);
}
else
{
s3_storage = std::make_shared<S3ObjectStorage>(
std::move(client), std::move(settings), uri.version_id, s3_capabilities, uri.bucket, uri.endpoint, uri.key);
std::move(client), std::move(settings),
uri.version_id, s3_capabilities,
uri.bucket, uri.endpoint, uri.key, name);
auto [metadata_path, metadata_disk] = prepareForLocalMetadata(name, config, config_prefix, context);
metadata_storage = std::make_shared<MetadataStorageFromDisk>(metadata_disk, uri.key);
}

View File

@ -19,23 +19,10 @@ struct StoredObject
uint64_t bytes_size = 0;
StoredObject() = default;
explicit StoredObject(String remote_path_)
: remote_path(std::move(remote_path_))
{}
StoredObject(
String remote_path_,
uint64_t bytes_size_)
: remote_path(std::move(remote_path_))
, bytes_size(bytes_size_)
{}
StoredObject(
String remote_path_,
uint64_t bytes_size_,
String local_path_)
explicit StoredObject(
const String & remote_path_ = "",
const String & local_path_ = "",
uint64_t bytes_size_ = 0)
: remote_path(std::move(remote_path_))
, local_path(std::move(local_path_))
, bytes_size(bytes_size_)

View File

@ -87,7 +87,7 @@ StoredObjects MetadataStorageFromStaticFilesWebServer::getStorageObjects(const s
remote_path = remote_path.substr(object_storage.url.size());
std::shared_lock shared_lock(object_storage.metadata_mutex);
return {StoredObject(remote_path, object_storage.files.at(path).size, path)};
return {StoredObject(remote_path, path, object_storage.files.at(path).size)};
}
std::vector<std::string> MetadataStorageFromStaticFilesWebServer::listDirectory(const std::string & path) const

View File

@ -207,6 +207,8 @@ public:
FunctionBasePtr buildImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & return_type) const override
{
if (arguments.size() == 1)
return FunctionFactory::instance().getImpl("toString", context)->build(arguments);
if (std::ranges::all_of(arguments, [](const auto & elem) { return isArray(elem.type); }))
return FunctionFactory::instance().getImpl("arrayConcat", context)->build(arguments);
if (std::ranges::all_of(arguments, [](const auto & elem) { return isMap(elem.type); }))
@ -221,10 +223,10 @@ public:
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
{
if (arguments.size() < 2)
if (arguments.empty())
throw Exception(
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Number of arguments for function {} doesn't match: passed {}, should be at least 2.",
"Number of arguments for function {} doesn't match: passed {}, should be at least 1.",
getName(),
arguments.size());

View File

@ -0,0 +1,72 @@
#include <IO/S3/BlobStorageLogWriter.h>
#if USE_AWS_S3
#include <base/getThreadId.h>
#include <Common/setThreadName.h>
#include <IO/S3/Client.h>
#include <Interpreters/Context.h>
namespace DB
{
void BlobStorageLogWriter::addEvent(
BlobStorageLogElement::EventType event_type,
const String & bucket,
const String & remote_path,
const String & local_path_,
size_t data_size,
const Aws::S3::S3Error * error,
BlobStorageLogElement::EvenTime time_now)
{
if (!log)
return;
if (!time_now.time_since_epoch().count())
time_now = std::chrono::system_clock::now();
BlobStorageLogElement element;
element.event_type = event_type;
element.query_id = query_id;
element.thread_id = getThreadId();
element.thread_name = getThreadName();
element.disk_name = disk_name;
element.bucket = bucket;
element.remote_path = remote_path;
element.local_path = local_path_.empty() ? local_path : local_path_;
element.data_size = data_size;
if (error)
{
element.error_code = static_cast<Int32>(error->GetErrorType());
element.error_message = error->GetMessage();
}
element.event_time = time_now;
log->add(element);
}
BlobStorageLogWriterPtr BlobStorageLogWriter::create(const String & disk_name)
{
#ifndef CLICKHOUSE_KEEPER_STANDALONE_BUILD /// Keeper standalone build doesn't have a context
if (auto blob_storage_log = Context::getGlobalContextInstance()->getBlobStorageLog())
{
auto log_writer = std::make_shared<BlobStorageLogWriter>(std::move(blob_storage_log));
log_writer->disk_name = disk_name;
if (CurrentThread::isInitialized() && CurrentThread::get().getQueryContext())
log_writer->query_id = CurrentThread::getQueryId();
return log_writer;
}
#endif
return {};
}
}
#endif

View File

@ -0,0 +1,57 @@
#pragma once
#include <Interpreters/BlobStorageLog.h>
#include "config.h"
#if USE_AWS_S3
namespace Aws::S3
{
class S3Error;
}
namespace DB
{
using BlobStorageLogPtr = std::shared_ptr<BlobStorageLog>;
class BlobStorageLogWriter;
using BlobStorageLogWriterPtr = std::shared_ptr<BlobStorageLogWriter>;
/// Helper class tp write events to BlobStorageLog
/// Can additionally hold some context information
class BlobStorageLogWriter : private boost::noncopyable
{
public:
BlobStorageLogWriter() = default;
explicit BlobStorageLogWriter(BlobStorageLogPtr log_)
: log(std::move(log_))
{}
void addEvent(
BlobStorageLogElement::EventType event_type,
const String & bucket,
const String & remote_path,
const String & local_path,
size_t data_size,
const Aws::S3::S3Error * error,
BlobStorageLogElement::EvenTime time_now = {});
bool isInitialized() const { return log != nullptr; }
/// Optional context information
String disk_name;
String query_id;
String local_path;
static BlobStorageLogWriterPtr create(const String & disk_name = "");
private:
BlobStorageLogPtr log;
};
}
#endif

View File

@ -4,6 +4,7 @@
#include <Common/ProfileEvents.h>
#include <Common/typeid_cast.h>
#include <IO/S3/BlobStorageLogWriter.h>
#include <Interpreters/Context.h>
#include <IO/LimitSeekableReadBuffer.h>
#include <IO/S3/getObjectInfo.h>
@ -59,6 +60,7 @@ namespace
const std::optional<std::map<String, String>> & object_metadata_,
ThreadPoolCallbackRunner<void> schedule_,
bool for_disk_s3_,
BlobStorageLogWriterPtr blob_storage_log_,
const Poco::Logger * log_)
: client_ptr(client_ptr_)
, dest_bucket(dest_bucket_)
@ -68,6 +70,7 @@ namespace
, object_metadata(object_metadata_)
, schedule(schedule_)
, for_disk_s3(for_disk_s3_)
, blob_storage_log(blob_storage_log_)
, log(log_)
{
}
@ -83,6 +86,7 @@ namespace
const std::optional<std::map<String, String>> & object_metadata;
ThreadPoolCallbackRunner<void> schedule;
bool for_disk_s3;
BlobStorageLogWriterPtr blob_storage_log;
const Poco::Logger * log;
struct UploadPartTask
@ -132,6 +136,10 @@ namespace
ProfileEvents::increment(ProfileEvents::DiskS3CreateMultipartUpload);
auto outcome = client_ptr->CreateMultipartUpload(request);
if (blob_storage_log)
blob_storage_log->addEvent(BlobStorageLogElement::EventType::MultiPartUploadCreate,
dest_bucket, dest_key, /* local_path_ */ {}, /* data_size */ 0,
outcome.IsSuccess() ? nullptr : &outcome.GetError());
if (outcome.IsSuccess())
{
@ -178,6 +186,16 @@ namespace
auto outcome = client_ptr->CompleteMultipartUpload(request);
if (blob_storage_log)
blob_storage_log->addEvent(BlobStorageLogElement::EventType::MultiPartUploadComplete,
dest_bucket, dest_key, /* local_path_ */ {}, /* data_size */ 0,
outcome.IsSuccess() ? nullptr : &outcome.GetError());
if (blob_storage_log)
blob_storage_log->addEvent(BlobStorageLogElement::EventType::MultiPartUploadComplete,
dest_bucket, dest_key, /* local_path_ */ {}, /* data_size */ 0,
outcome.IsSuccess() ? nullptr : &outcome.GetError());
if (outcome.IsSuccess())
{
LOG_TRACE(log, "Multipart upload has completed. Bucket: {}, Key: {}, Upload_id: {}, Parts: {}", dest_bucket, dest_key, multipart_upload_id, part_tags.size());
@ -206,7 +224,12 @@ namespace
abort_request.SetBucket(dest_bucket);
abort_request.SetKey(dest_key);
abort_request.SetUploadId(multipart_upload_id);
client_ptr->AbortMultipartUpload(abort_request);
auto outcome = client_ptr->AbortMultipartUpload(abort_request);
if (blob_storage_log)
blob_storage_log->addEvent(BlobStorageLogElement::EventType::MultiPartUploadAbort,
dest_bucket, dest_key, /* local_path_ */ {}, /* data_size */ 0,
outcome.IsSuccess() ? nullptr : &outcome.GetError());
multipart_upload_aborted = true;
}
@ -435,8 +458,9 @@ namespace
const S3Settings::RequestSettings & request_settings_,
const std::optional<std::map<String, String>> & object_metadata_,
ThreadPoolCallbackRunner<void> schedule_,
bool for_disk_s3_)
: UploadHelper(client_ptr_, dest_bucket_, dest_key_, request_settings_, object_metadata_, schedule_, for_disk_s3_, &Poco::Logger::get("copyDataToS3File"))
bool for_disk_s3_,
BlobStorageLogWriterPtr blob_storage_log_)
: UploadHelper(client_ptr_, dest_bucket_, dest_key_, request_settings_, object_metadata_, schedule_, for_disk_s3_, blob_storage_log_, &Poco::Logger::get("copyDataToS3File"))
, create_read_buffer(create_read_buffer_)
, offset(offset_)
, size(size_)
@ -500,6 +524,10 @@ namespace
Stopwatch watch;
auto outcome = client_ptr->PutObject(request);
watch.stop();
if (blob_storage_log)
blob_storage_log->addEvent(BlobStorageLogElement::EventType::Upload,
dest_bucket, dest_key, /* local_path_ */ {}, size,
outcome.IsSuccess() ? nullptr : &outcome.GetError());
if (outcome.IsSuccess())
{
@ -581,6 +609,11 @@ namespace
ProfileEvents::increment(ProfileEvents::DiskS3UploadPart);
auto outcome = client_ptr->UploadPart(req);
if (blob_storage_log)
blob_storage_log->addEvent(BlobStorageLogElement::EventType::MultiPartUploadWrite,
dest_bucket, dest_key, /* local_path_ */ {}, size,
outcome.IsSuccess() ? nullptr : &outcome.GetError());
if (!outcome.IsSuccess())
{
abortMultipartUpload();
@ -608,8 +641,9 @@ namespace
const ReadSettings & read_settings_,
const std::optional<std::map<String, String>> & object_metadata_,
ThreadPoolCallbackRunner<void> schedule_,
bool for_disk_s3_)
: UploadHelper(client_ptr_, dest_bucket_, dest_key_, request_settings_, object_metadata_, schedule_, for_disk_s3_, &Poco::Logger::get("copyS3File"))
bool for_disk_s3_,
BlobStorageLogWriterPtr blob_storage_log_)
: UploadHelper(client_ptr_, dest_bucket_, dest_key_, request_settings_, object_metadata_, schedule_, for_disk_s3_, blob_storage_log_, &Poco::Logger::get("copyS3File"))
, src_bucket(src_bucket_)
, src_key(src_key_)
, offset(src_offset_)
@ -712,6 +746,7 @@ namespace
dest_bucket,
dest_key,
request_settings,
blob_storage_log,
object_metadata,
schedule,
for_disk_s3);
@ -803,11 +838,12 @@ void copyDataToS3File(
const String & dest_bucket,
const String & dest_key,
const S3Settings::RequestSettings & settings,
BlobStorageLogWriterPtr blob_storage_log,
const std::optional<std::map<String, String>> & object_metadata,
ThreadPoolCallbackRunner<void> schedule,
bool for_disk_s3)
{
CopyDataToFileHelper helper{create_read_buffer, offset, size, dest_s3_client, dest_bucket, dest_key, settings, object_metadata, schedule, for_disk_s3};
CopyDataToFileHelper helper{create_read_buffer, offset, size, dest_s3_client, dest_bucket, dest_key, settings, object_metadata, schedule, for_disk_s3, blob_storage_log};
helper.performCopy();
}
@ -822,13 +858,14 @@ void copyS3File(
const String & dest_key,
const S3Settings::RequestSettings & settings,
const ReadSettings & read_settings,
BlobStorageLogWriterPtr blob_storage_log,
const std::optional<std::map<String, String>> & object_metadata,
ThreadPoolCallbackRunner<void> schedule,
bool for_disk_s3)
{
if (settings.allow_native_copy)
{
CopyFileHelper helper{s3_client, src_bucket, src_key, src_offset, src_size, dest_bucket, dest_key, settings, read_settings, object_metadata, schedule, for_disk_s3};
CopyFileHelper helper{s3_client, src_bucket, src_key, src_offset, src_size, dest_bucket, dest_key, settings, read_settings, object_metadata, schedule, for_disk_s3, blob_storage_log};
helper.performCopy();
}
else
@ -837,7 +874,7 @@ void copyS3File(
{
return std::make_unique<ReadBufferFromS3>(s3_client, src_bucket, src_key, "", settings, read_settings);
};
copyDataToS3File(create_read_buffer, src_offset, src_size, s3_client, dest_bucket, dest_key, settings, object_metadata, schedule, for_disk_s3);
copyDataToS3File(create_read_buffer, src_offset, src_size, s3_client, dest_bucket, dest_key, settings, blob_storage_log, object_metadata, schedule, for_disk_s3);
}
}

View File

@ -6,6 +6,7 @@
#include <Storages/StorageS3Settings.h>
#include <Interpreters/threadPoolCallbackRunner.h>
#include <IO/S3/BlobStorageLogWriter.h>
#include <base/types.h>
#include <functional>
#include <memory>
@ -38,6 +39,7 @@ void copyS3File(
const String & dest_key,
const S3Settings::RequestSettings & settings,
const ReadSettings & read_settings,
BlobStorageLogWriterPtr blob_storage_log,
const std::optional<std::map<String, String>> & object_metadata = std::nullopt,
ThreadPoolCallbackRunner<void> schedule_ = {},
bool for_disk_s3 = false);
@ -55,6 +57,7 @@ void copyDataToS3File(
const String & dest_bucket,
const String & dest_key,
const S3Settings::RequestSettings & settings,
BlobStorageLogWriterPtr blob_storage_log,
const std::optional<std::map<String, String>> & object_metadata = std::nullopt,
ThreadPoolCallbackRunner<void> schedule_ = {},
bool for_disk_s3 = false);

View File

@ -95,7 +95,8 @@ void doWriteRequest(std::shared_ptr<const DB::S3::Client> client, const DB::S3::
uri.bucket,
uri.key,
DBMS_DEFAULT_BUFFER_SIZE,
request_settings
request_settings,
{}
);
write_buffer.write('\0'); // doesn't matter what we write here, just needs to be something

View File

@ -16,7 +16,7 @@
#include <IO/S3Common.h>
#include <IO/S3/Requests.h>
#include <IO/S3/getObjectInfo.h>
#include <Interpreters/Context.h>
#include <IO/S3/BlobStorageLogWriter.h>
#include <aws/s3/model/StorageClass.h>
@ -81,6 +81,7 @@ WriteBufferFromS3::WriteBufferFromS3(
const String & key_,
size_t buf_size_,
const S3Settings::RequestSettings & request_settings_,
BlobStorageLogWriterPtr blob_log_,
std::optional<std::map<String, String>> object_metadata_,
ThreadPoolCallbackRunner<void> schedule_,
const WriteSettings & write_settings_)
@ -98,6 +99,7 @@ WriteBufferFromS3::WriteBufferFromS3(
std::move(schedule_),
upload_settings.max_inflight_parts_for_one_file,
limitedLog))
, blob_log(std::move(blob_log_))
{
LOG_TRACE(limitedLog, "Create WriteBufferFromS3, {}", getShortLogDetails());
@ -378,6 +380,9 @@ void WriteBufferFromS3::createMultipartUpload()
watch.stop();
ProfileEvents::increment(ProfileEvents::WriteBufferFromS3Microseconds, watch.elapsedMicroseconds());
if (blob_log)
blob_log->addEvent(BlobStorageLogElement::EventType::MultiPartUploadCreate, bucket, key, {}, 0,
outcome.IsSuccess() ? nullptr : &outcome.GetError());
if (!outcome.IsSuccess())
{
@ -386,6 +391,7 @@ void WriteBufferFromS3::createMultipartUpload()
}
multipart_upload_id = outcome.GetResult().GetUploadId();
LOG_TRACE(limitedLog, "Multipart upload has created. {}", getShortLogDetails());
}
@ -414,6 +420,10 @@ void WriteBufferFromS3::abortMultipartUpload()
ProfileEvents::increment(ProfileEvents::WriteBufferFromS3Microseconds, watch.elapsedMicroseconds());
if (blob_log)
blob_log->addEvent(BlobStorageLogElement::EventType::MultiPartUploadAbort, bucket, key, {}, 0,
outcome.IsSuccess() ? nullptr : &outcome.GetError());
if (!outcome.IsSuccess())
{
ProfileEvents::increment(ProfileEvents::WriteBufferFromS3RequestsErrors, 1);
@ -508,6 +518,13 @@ void WriteBufferFromS3::writePart(WriteBufferFromS3::PartData && data)
ProfileEvents::increment(ProfileEvents::WriteBufferFromS3Microseconds, watch.elapsedMicroseconds());
if (blob_log)
{
blob_log->addEvent(BlobStorageLogElement::EventType::MultiPartUploadWrite,
/* bucket = */ bucket, /* remote_path = */ key, /* local_path = */ {}, /* data_size */ data_size,
outcome.IsSuccess() ? nullptr : &outcome.GetError());
}
if (!outcome.IsSuccess())
{
ProfileEvents::increment(ProfileEvents::WriteBufferFromS3RequestsErrors, 1);
@ -569,6 +586,10 @@ void WriteBufferFromS3::completeMultipartUpload()
ProfileEvents::increment(ProfileEvents::WriteBufferFromS3Microseconds, watch.elapsedMicroseconds());
if (blob_log)
blob_log->addEvent(BlobStorageLogElement::EventType::MultiPartUploadComplete, bucket, key, {}, 0,
outcome.IsSuccess() ? nullptr : &outcome.GetError());
if (outcome.IsSuccess())
{
LOG_TRACE(limitedLog, "Multipart upload has completed. {}, Parts: {}", getShortLogDetails(), multipart_tags.size());
@ -650,6 +671,9 @@ void WriteBufferFromS3::makeSinglepartUpload(WriteBufferFromS3::PartData && data
rlock.unlock();
ProfileEvents::increment(ProfileEvents::WriteBufferFromS3Microseconds, watch.elapsedMicroseconds());
if (blob_log)
blob_log->addEvent(BlobStorageLogElement::EventType::Upload, bucket, key, {}, request.GetContentLength(),
outcome.IsSuccess() ? nullptr : &outcome.GetError());
if (outcome.IsSuccess())
{

View File

@ -11,6 +11,7 @@
#include <IO/WriteSettings.h>
#include <Storages/StorageS3Settings.h>
#include <Interpreters/threadPoolCallbackRunner.h>
#include <IO/S3/BlobStorageLogWriter.h>
#include <memory>
#include <vector>
@ -34,6 +35,7 @@ public:
const String & key_,
size_t buf_size_,
const S3Settings::RequestSettings & request_settings_,
BlobStorageLogWriterPtr blob_log_,
std::optional<std::map<String, String>> object_metadata_ = std::nullopt,
ThreadPoolCallbackRunner<void> schedule_ = {},
const WriteSettings & write_settings_ = {});
@ -118,6 +120,8 @@ private:
class TaskTracker;
std::unique_ptr<TaskTracker> task_tracker;
BlobStorageLogWriterPtr blob_log;
};
}

View File

@ -554,6 +554,7 @@ public:
file_name,
DBMS_DEFAULT_BUFFER_SIZE,
request_settings,
nullptr,
std::nullopt,
getAsyncPolicy().getScheduler());
}
@ -1214,7 +1215,7 @@ TEST_F(WBS3Test, ReadBeyondLastOffset) {
/// create encrypted file reader
auto cache_log = std::shared_ptr<FilesystemCacheLog>();
const StoredObjects objects = { StoredObject(remote_file, data.size() + FileEncryption::Header::kSize) };
const StoredObjects objects = { StoredObject(remote_file, /* local_path */ "", data.size() + FileEncryption::Header::kSize) };
auto async_read_counters = std::make_shared<AsyncReadCounters>();
auto prefetch_log = std::shared_ptr<FilesystemReadPrefetchesLog>();

View File

@ -0,0 +1,92 @@
#include <Interpreters/BlobStorageLog.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeEnum.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeDateTime64.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <DataTypes/DataTypeDate.h>
namespace DB
{
NamesAndTypesList BlobStorageLogElement::getNamesAndTypes()
{
auto event_enum_type = std::make_shared<DataTypeEnum8>(
DataTypeEnum8::Values{
{"Upload", static_cast<Int8>(EventType::Upload)},
{"Delete", static_cast<Int8>(EventType::Delete)},
{"MultiPartUploadCreate", static_cast<Int8>(EventType::MultiPartUploadCreate)},
{"MultiPartUploadWrite", static_cast<Int8>(EventType::MultiPartUploadWrite)},
{"MultiPartUploadComplete", static_cast<Int8>(EventType::MultiPartUploadComplete)},
{"MultiPartUploadAbort", static_cast<Int8>(EventType::MultiPartUploadAbort)},
});
return {
{"event_date", std::make_shared<DataTypeDate>()},
{"event_time", std::make_shared<DataTypeDateTime>()},
{"event_time_microseconds", std::make_shared<DataTypeDateTime64>(6)},
{"event_type", event_enum_type},
{"query_id", std::make_shared<DataTypeString>()},
{"thread_id", std::make_shared<DataTypeUInt64>()},
{"thread_name", std::make_shared<DataTypeString>()},
{"disk_name", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())},
{"bucket", std::make_shared<DataTypeString>()},
{"remote_path", std::make_shared<DataTypeString>()},
{"local_path", std::make_shared<DataTypeString>()},
{"data_size", std::make_shared<DataTypeUInt64>()},
{"error", std::make_shared<DataTypeString>()},
};
}
void BlobStorageLogElement::appendToBlock(MutableColumns & columns) const
{
#ifndef NDEBUG
auto coulumn_names = BlobStorageLogElement::getNamesAndTypes().getNames();
#endif
size_t i = 0;
auto event_time_seconds = timeInSeconds(event_time);
assert(coulumn_names.at(i) == "event_date");
columns[i++]->insert(DateLUT::instance().toDayNum(event_time_seconds).toUnderType());
assert(coulumn_names.at(i) == "event_time");
columns[i++]->insert(event_time_seconds);
assert(coulumn_names.at(i) == "event_time_microseconds");
columns[i++]->insert(Decimal64(timeInMicroseconds(event_time)));
assert(coulumn_names.at(i) == "event_type");
columns[i++]->insert(static_cast<Int8>(event_type));
assert(coulumn_names.at(i) == "query_id");
columns[i++]->insert(query_id);
assert(coulumn_names.at(i) == "thread_id");
columns[i++]->insert(thread_id);
assert(coulumn_names.at(i) == "thread_name");
columns[i++]->insert(thread_name);
assert(coulumn_names.at(i) == "disk_name");
columns[i++]->insert(disk_name);
assert(coulumn_names.at(i) == "bucket");
columns[i++]->insert(bucket);
assert(coulumn_names.at(i) == "remote_path");
columns[i++]->insert(remote_path);
assert(coulumn_names.at(i) == "local_path");
columns[i++]->insert(local_path);
assert(coulumn_names.at(i) == "data_size");
columns[i++]->insert(data_size);
assert(coulumn_names.at(i) == "error");
columns[i++]->insert(error_message);
assert(i == coulumn_names.size() && columns.size() == coulumn_names.size());
}
}

View File

@ -0,0 +1,57 @@
#pragma once
#include <Interpreters/SystemLog.h>
#include <Core/NamesAndTypes.h>
#include <Core/NamesAndAliases.h>
#include <Poco/Message.h>
#include <chrono>
namespace DB
{
struct BlobStorageLogElement
{
enum class EventType : Int8
{
Upload = 1,
Delete = 2,
MultiPartUploadCreate = 3,
MultiPartUploadWrite = 4,
MultiPartUploadComplete = 5,
MultiPartUploadAbort = 6,
};
EventType event_type;
String query_id;
UInt64 thread_id = 0;
String thread_name;
String disk_name;
String bucket;
String remote_path;
String local_path;
size_t data_size;
Int32 error_code = -1; /// negative if no error
String error_message;
using EvenTime = std::chrono::time_point<std::chrono::system_clock>;
EvenTime event_time;
static std::string name() { return "BlobStorageLog"; }
static NamesAndTypesList getNamesAndTypes();
static NamesAndAliases getNamesAndAliases() { return {}; }
void appendToBlock(MutableColumns & columns) const;
static const char * getCustomColumnList() { return nullptr; }
};
class BlobStorageLog : public SystemLog<BlobStorageLogElement>
{
using SystemLog<BlobStorageLogElement>::SystemLog;
};
}

View File

@ -3667,16 +3667,25 @@ std::shared_ptr<BackupLog> Context::getBackupLog() const
return shared->system_logs->backup_log;
}
std::shared_ptr<BlobStorageLog> Context::getBlobStorageLog() const
{
SharedLockGuard lock(shared->mutex);
if (!shared->system_logs)
return {};
return shared->system_logs->blob_storage_log;
}
std::vector<ISystemLog *> Context::getSystemLogs() const
{
SharedLockGuard lock(shared->mutex);
if (!shared->system_logs)
return {};
return shared->system_logs->logs;
}
CompressionCodecPtr Context::chooseCompressionCodec(size_t part_size, double part_size_ratio) const
{
std::lock_guard lock(shared->mutex);

View File

@ -107,6 +107,7 @@ class FilesystemReadPrefetchesLog;
class S3QueueLog;
class AsynchronousInsertLog;
class BackupLog;
class BlobStorageLog;
class IAsynchronousReader;
struct MergeTreeSettings;
struct InitialAllRangesAnnouncement;
@ -1057,6 +1058,7 @@ public:
std::shared_ptr<FilesystemReadPrefetchesLog> getFilesystemReadPrefetchesLog() const;
std::shared_ptr<AsynchronousInsertLog> getAsynchronousInsertLog() const;
std::shared_ptr<BackupLog> getBackupLog() const;
std::shared_ptr<BlobStorageLog> getBlobStorageLog() const;
std::vector<ISystemLog *> getSystemLogs() const;

View File

@ -35,6 +35,7 @@
#include <Interpreters/ProcessorsProfileLog.h>
#include <Interpreters/AsynchronousInsertLog.h>
#include <Interpreters/BackupLog.h>
#include <IO/S3/BlobStorageLogWriter.h>
#include <Interpreters/JIT/CompiledExpressionCache.h>
#include <Interpreters/TransactionLog.h>
#include <Interpreters/AsynchronousInsertQueue.h>

View File

@ -10,6 +10,7 @@
#include <Interpreters/OpenTelemetrySpanLog.h>
#include <Interpreters/PartLog.h>
#include <Interpreters/ProcessorsProfileLog.h>
#include <Interpreters/BlobStorageLog.h>
#include <Interpreters/QueryLog.h>
#include <Interpreters/QueryThreadLog.h>
#include <Interpreters/QueryViewsLog.h>
@ -291,6 +292,7 @@ SystemLogs::SystemLogs(ContextPtr global_context, const Poco::Util::AbstractConf
asynchronous_insert_log = createSystemLog<AsynchronousInsertLog>(global_context, "system", "asynchronous_insert_log", config, "asynchronous_insert_log");
backup_log = createSystemLog<BackupLog>(global_context, "system", "backup_log", config, "backup_log");
s3_queue_log = createSystemLog<S3QueueLog>(global_context, "system", "s3queue_log", config, "s3queue_log");
blob_storage_log = createSystemLog<BlobStorageLog>(global_context, "system", "blob_storage_log", config, "blob_storage_log");
if (query_log)
logs.emplace_back(query_log.get());
@ -333,6 +335,8 @@ SystemLogs::SystemLogs(ContextPtr global_context, const Poco::Util::AbstractConf
logs.emplace_back(backup_log.get());
if (s3_queue_log)
logs.emplace_back(s3_queue_log.get());
if (blob_storage_log)
logs.emplace_back(blob_storage_log.get());
try
{

View File

@ -51,6 +51,7 @@ class FilesystemReadPrefetchesLog;
class AsynchronousInsertLog;
class BackupLog;
class S3QueueLog;
class BlobStorageLog;
/// System logs should be destroyed in destructor of the last Context and before tables,
/// because SystemLog destruction makes insert query while flushing data into underlying tables
@ -89,6 +90,8 @@ struct SystemLogs
std::shared_ptr<AsynchronousInsertLog> asynchronous_insert_log;
/// Backup and restore events
std::shared_ptr<BackupLog> backup_log;
/// Log blob storage operations
std::shared_ptr<BlobStorageLog> blob_storage_log;
std::vector<ISystemLog *> logs;
};

View File

@ -734,6 +734,12 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
bool is_create_parameterized_view = false;
if (const auto * create_query = ast->as<ASTCreateQuery>())
is_create_parameterized_view = create_query->isParameterizedView();
else if (const auto * explain_query = ast->as<ASTExplainQuery>())
{
assert(!explain_query->children.empty());
if (const auto * create_of_explain_query = explain_query->children[0]->as<ASTCreateQuery>())
is_create_parameterized_view = create_of_explain_query->isParameterizedView();
}
/// Replace ASTQueryParameter with ASTLiteral for prepared statements.
/// Even if we don't have parameters in query_context, check that AST doesn't have unknown parameters

View File

@ -84,6 +84,7 @@ struct Settings;
M(UInt64, min_delay_to_insert_ms, 10, "Min delay of inserting data into MergeTree table in milliseconds, if there are a lot of unmerged parts in single partition.", 0) \
M(UInt64, max_parts_in_total, 100000, "If more than this number active parts in all partitions in total, throw 'Too many parts ...' exception.", 0) \
M(Bool, async_insert, false, "If true, data from INSERT query is stored in queue and later flushed to table in background.", 0) \
M(Bool, add_implicit_sign_column_constraint_for_collapsing_engine, false, "If true, add implicit constraint for sign column for CollapsingMergeTree engine.", 0) \
M(Milliseconds, sleep_before_commit_local_part_in_replicated_table_ms, 0, "For testing. Do not change it.", 0) \
\
/* Part removal settings. */ \

View File

@ -598,11 +598,6 @@ static StoragePtr create(const StorageFactory::Arguments & args)
metadata.projections.add(std::move(projection));
}
auto constraints = metadata.constraints.getConstraints();
if (args.query.columns_list && args.query.columns_list->constraints)
for (auto & constraint : args.query.columns_list->constraints->children)
constraints.push_back(constraint);
metadata.constraints = ConstraintsDescription(constraints);
auto column_ttl_asts = columns.getColumnTTLs();
for (const auto & [name, ast] : column_ttl_asts)
@ -620,6 +615,30 @@ static StoragePtr create(const StorageFactory::Arguments & args)
args.getLocalContext()->checkMergeTreeSettingsConstraints(initial_storage_settings, storage_settings->changes());
metadata.settings_changes = args.storage_def->settings->ptr();
}
auto constraints = metadata.constraints.getConstraints();
if (args.query.columns_list && args.query.columns_list->constraints)
for (auto & constraint : args.query.columns_list->constraints->children)
constraints.push_back(constraint);
if ((merging_params.mode == MergeTreeData::MergingParams::Collapsing ||
merging_params.mode == MergeTreeData::MergingParams::VersionedCollapsing) &&
storage_settings->add_implicit_sign_column_constraint_for_collapsing_engine)
{
auto sign_column_check_constraint = std::make_unique<ASTConstraintDeclaration>();
sign_column_check_constraint->name = "check_sign_column";
sign_column_check_constraint->type = ASTConstraintDeclaration::Type::CHECK;
Array valid_values_array;
valid_values_array.emplace_back(-1);
valid_values_array.emplace_back(1);
auto valid_values_ast = std::make_unique<ASTLiteral>(std::move(valid_values_array));
auto sign_column_ast = std::make_unique<ASTIdentifier>(merging_params.sign_column);
sign_column_check_constraint->set(sign_column_check_constraint->expr, makeASTFunction("in", std::move(sign_column_ast), std::move(valid_values_ast)));
constraints.push_back(std::move(sign_column_check_constraint));
}
metadata.constraints = ConstraintsDescription(constraints);
}
else
{

View File

@ -243,11 +243,16 @@ std::shared_ptr<StorageS3QueueSource> StorageS3Queue::createSource(
configuration_snapshot.url.uri.getHost() + std::to_string(configuration_snapshot.url.uri.getPort()),
file_iterator, local_context->getSettingsRef().max_download_threads, false, /* query_info */ std::nullopt);
auto file_deleter = [this, bucket = configuration_snapshot.url.bucket, client = configuration_snapshot.client](const std::string & path)
auto file_deleter = [this, bucket = configuration_snapshot.url.bucket, client = configuration_snapshot.client, blob_storage_log = BlobStorageLogWriter::create()](const std::string & path) mutable
{
S3::DeleteObjectRequest request;
request.WithKey(path).WithBucket(bucket);
auto outcome = client->DeleteObject(request);
if (blob_storage_log)
blob_storage_log->addEvent(
BlobStorageLogElement::EventType::Delete,
bucket, path, {}, 0, outcome.IsSuccess() ? nullptr : &outcome.GetError());
if (!outcome.IsSuccess())
{
const auto & err = outcome.GetError();

View File

@ -10,6 +10,8 @@
#include <Storages/S3Queue/S3QueueSource.h>
#include <Storages/StorageS3.h>
#include <Interpreters/Context.h>
#include <IO/S3/BlobStorageLogWriter.h>
namespace Aws::S3
{
@ -74,6 +76,7 @@ private:
std::atomic<bool> mv_attached = false;
std::atomic<bool> shutdown_called = false;
std::atomic<bool> table_is_being_dropped = false;
Poco::Logger * log;
void startup() override;

View File

@ -1384,7 +1384,7 @@ std::unique_ptr<ReadBuffer> StorageAzureBlobSource::createAsyncAzureReadBuffer(
{
auto modified_settings{read_settings};
modified_settings.remote_read_min_bytes_for_seek = modified_settings.remote_fs_buffer_size;
auto async_reader = object_storage->readObjects(StoredObjects{StoredObject{key, object_size}}, modified_settings);
auto async_reader = object_storage->readObjects(StoredObjects{StoredObject{key, /* local_path */ "", object_size}}, modified_settings);
async_reader->setReadUntilEnd();
if (read_settings.remote_fs_prefetch)

View File

@ -711,7 +711,7 @@ std::unique_ptr<ReadBuffer> StorageS3Source::createAsyncS3ReadBuffer(
auto s3_impl = std::make_unique<ReadBufferFromRemoteFSGather>(
std::move(read_buffer_creator),
StoredObjects{StoredObject{key, object_size}},
StoredObjects{StoredObject{key, /* local_path */ "", object_size}},
read_settings,
/* cache_log */nullptr, /* use_external_buffer */true);
@ -822,6 +822,13 @@ public:
, sample_block(sample_block_)
, format_settings(format_settings_)
{
BlobStorageLogWriterPtr blob_log = nullptr;
if (auto blob_storage_log = context->getBlobStorageLog())
{
blob_log = std::make_shared<BlobStorageLogWriter>(std::move(blob_storage_log));
blob_log->query_id = context->getCurrentQueryId();
}
write_buf = wrapWriteBufferWithCompressionMethod(
std::make_unique<WriteBufferFromS3>(
configuration_.client,
@ -829,6 +836,7 @@ public:
key,
DBMS_DEFAULT_BUFFER_SIZE,
configuration_.request_settings,
std::move(blob_log),
std::nullopt,
threadPoolCallbackRunner<void>(getIOThreadPool().get(), "S3ParallelWrite"),
context->getWriteSettings()),
@ -1241,6 +1249,15 @@ void StorageS3::truncate(const ASTPtr & /* query */, const StorageMetadataPtr &,
request.SetDelete(delkeys);
auto response = query_configuration.client->DeleteObjects(request);
const auto * response_error = response.IsSuccess() ? nullptr : &response.GetError();
auto time_now = std::chrono::system_clock::now();
if (auto blob_storage_log = BlobStorageLogWriter::create())
{
for (const auto & key : query_configuration.keys)
blob_storage_log->addEvent(BlobStorageLogElement::EventType::Delete, query_configuration.url.bucket, key, {}, 0, response_error, time_now);
}
if (!response.IsSuccess())
{
const auto & err = response.GetError();

View File

@ -24,6 +24,7 @@
#include <Storages/SelectQueryInfo.h>
#include <Storages/StorageConfiguration.h>
#include <Storages/prepareReadingFromFormat.h>
#include <IO/S3/BlobStorageLogWriter.h>
namespace Aws::S3
{

View File

@ -150,7 +150,7 @@ def main():
DOCS_NAME,
pr_info,
)
sys.exit(1)
sys.exit(0)
if description_error:
print(

View File

@ -0,0 +1,9 @@
<clickhouse>
<blob_storage_log>
<database>system</database>
<table>blob_storage_log</table>
<partition_by>toYYYYMM(event_date)</partition_by>
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
<ttl>event_date + INTERVAL 30 DAY</ttl>
</blob_storage_log>
</clickhouse>

View File

@ -18,6 +18,7 @@ mkdir -p $DEST_CLIENT_PATH
ln -sf $SRC_PATH/config.d/zookeeper_write.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/listen.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/text_log.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/blob_storage_log.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/custom_settings_prefixes.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/enable_access_control_improvements.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/macros.xml $DEST_SERVER_PATH/config.d/

View File

@ -214,7 +214,13 @@ def test_create_or_drop_tables_during_backup(db_engine, table_engine):
while time.time() < end_time:
table_name = f"mydb.tbl{randint(1, num_nodes)}"
node = nodes[randint(0, num_nodes - 1)]
node.query(f"DROP TABLE IF EXISTS {table_name} SYNC")
# "DROP TABLE IF EXISTS" still can throw some errors (e.g. "WRITE locking attempt on node0 has timed out!")
# So we use query_and_get_answer_with_error() to ignore any errors.
# `lock_acquire_timeout` is also reduced because we don't wait our test to wait too long.
node.query_and_get_answer_with_error(
f"DROP TABLE IF EXISTS {table_name} SYNC",
settings={"lock_acquire_timeout": 10},
)
def rename_tables():
while time.time() < end_time:

View File

@ -0,0 +1,9 @@
<clickhouse>
<blob_storage_log>
<database>system</database>
<table>blob_storage_log</table>
<partition_by>toYYYYMM(event_date)</partition_by>
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
<ttl>event_date + INTERVAL 30 DAY</ttl>
</blob_storage_log>
</clickhouse>

View File

@ -12,6 +12,7 @@ node = cluster.add_instance(
"configs/disk_s3.xml",
"configs/named_collection_s3_backups.xml",
"configs/s3_settings.xml",
"configs/blob_log.xml",
],
user_configs=[
"configs/zookeeper_retries.xml",
@ -51,10 +52,12 @@ def get_events_for_query(query_id: str) -> Dict[str, int]:
"""
)
)
return {
result = {
event: int(value)
for event, value in [line.split("\t") for line in events.lines]
}
result["query_id"] = query_id
return result
def format_settings(settings):
@ -118,7 +121,7 @@ def check_backup_and_restore(
)
def check_system_tables():
def check_system_tables(backup_query_id=None):
disks = [
tuple(disk.split("\t"))
for disk in node.query("SELECT name, type FROM system.disks").split("\n")
@ -136,6 +139,14 @@ def check_system_tables():
if expected_disk not in disks:
raise AssertionError(f"Missed {expected_disk} in {disks}")
if backup_query_id is not None:
blob_storage_log = node.query(
f"SELECT count() FROM system.blob_storage_log WHERE query_id = '{backup_query_id}' AND error = '' AND event_type = 'Upload'"
).strip()
assert int(blob_storage_log) >= 1, node.query(
"SELECT * FROM system.blob_storage_log FORMAT PrettyCompactMonoBlock"
)
@pytest.mark.parametrize(
"storage_policy, to_disk",
@ -179,8 +190,8 @@ def test_backup_to_s3():
backup_destination = (
f"S3('http://minio1:9001/root/data/backups/{backup_name}', 'minio', 'minio123')"
)
check_backup_and_restore(storage_policy, backup_destination)
check_system_tables()
(backup_events, _) = check_backup_and_restore(storage_policy, backup_destination)
check_system_tables(backup_events["query_id"])
def test_backup_to_s3_named_collection():
@ -203,6 +214,15 @@ def test_backup_to_s3_multipart():
f"copyDataToS3File: Multipart upload has completed. Bucket: root, Key: data/backups/multipart/{backup_name}"
)
backup_query_id = backup_events["query_id"]
blob_storage_log = node.query(
f"SELECT countIf(event_type == 'MultiPartUploadCreate') * countIf(event_type == 'MultiPartUploadComplete') * countIf(event_type == 'MultiPartUploadWrite') "
f"FROM system.blob_storage_log WHERE query_id = '{backup_query_id}' AND error = ''"
).strip()
assert int(blob_storage_log) >= 1, node.query(
"SELECT * FROM system.blob_storage_log FORMAT PrettyCompactMonoBlock"
)
s3_backup_events = (
"WriteBufferFromS3Microseconds",
"WriteBufferFromS3Bytes",

View File

@ -0,0 +1,9 @@
<clickhouse>
<blob_storage_log>
<database>system</database>
<table>blob_storage_log</table>
<partition_by>toYYYYMM(event_date)</partition_by>
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
<ttl>event_date + INTERVAL 30 DAY</ttl>
</blob_storage_log>
</clickhouse>

View File

@ -1,6 +1,7 @@
import logging
import time
import os
import uuid
import pytest
from helpers.cluster import ClickHouseCluster
@ -10,7 +11,6 @@ from helpers.wait_for_helpers import wait_for_delete_inactive_parts
from helpers.wait_for_helpers import wait_for_delete_empty_parts
from helpers.wait_for_helpers import wait_for_merges
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
@ -24,6 +24,7 @@ def cluster():
"configs/config.xml",
"configs/config.d/storage_conf.xml",
"configs/config.d/bg_processing_pool_conf.xml",
"configs/config.d/blob_log.xml",
],
user_configs=[
"configs/config.d/users.xml",
@ -37,6 +38,7 @@ def cluster():
main_configs=[
"configs/config.d/storage_conf.xml",
"configs/config.d/bg_processing_pool_conf.xml",
"configs/config.d/blob_log.xml",
],
with_minio=True,
tmpfs=[
@ -126,17 +128,22 @@ def list_objects(cluster, path="data/", hint="list_objects"):
def wait_for_delete_s3_objects(cluster, expected, timeout=30):
while timeout > 0:
if len(list_objects(cluster, "data/")) == expected:
return
existing_objects = list_objects(cluster, "data/")
if len(existing_objects) == expected:
return existing_objects
timeout -= 1
time.sleep(1)
assert len(list_objects(cluster, "data/")) == expected
existing_objects = list_objects(cluster, "data/")
assert len(existing_objects) == expected
return existing_objects
def remove_all_s3_objects(cluster):
minio = cluster.minio_client
for obj in list_objects(cluster, "data/"):
objects_to_delete = list_objects(cluster, "data/")
for obj in objects_to_delete:
minio.remove_object(cluster.minio_bucket, obj.object_name)
return objects_to_delete
@pytest.fixture(autouse=True, scope="function")
@ -155,7 +162,7 @@ def clear_minio(cluster):
def check_no_objects_after_drop(cluster, table_name="s3_test", node_name="node"):
node = cluster.instances[node_name]
node.query(f"DROP TABLE IF EXISTS {table_name} SYNC")
wait_for_delete_s3_objects(cluster, 0, timeout=0)
return wait_for_delete_s3_objects(cluster, 0, timeout=0)
@pytest.mark.parametrize(
@ -173,10 +180,32 @@ def test_simple_insert_select(
minio = cluster.minio_client
values1 = generate_values("2020-01-03", 4096)
node.query("INSERT INTO s3_test VALUES {}".format(values1))
insert_query_id = uuid.uuid4().hex
node.query(
"INSERT INTO s3_test VALUES {}".format(values1), query_id=insert_query_id
)
assert node.query("SELECT * FROM s3_test order by dt, id FORMAT Values") == values1
assert len(list_objects(cluster, "data/")) == FILES_OVERHEAD + files_per_part
node.query("SYSTEM FLUSH LOGS")
blob_storage_log = node.query(
f"SELECT * FROM system.blob_storage_log WHERE query_id = '{insert_query_id}' FORMAT PrettyCompactMonoBlock"
)
result = node.query(
f"""SELECT
(countIf( (event_type == 'Upload' OR event_type == 'MultiPartUploadWrite') as event_match) as total_events) > 0,
countIf(event_match AND bucket == 'root') == total_events,
countIf(event_match AND remote_path != '') == total_events,
countIf(event_match AND local_path != '') == total_events,
sumIf(data_size, event_match) > 0
FROM system.blob_storage_log
WHERE query_id = '{insert_query_id}' AND error == ''
"""
)
assert result == "1\t1\t1\t1\t1\n", blob_storage_log
values2 = generate_values("2020-01-04", 4096)
node.query("INSERT INTO s3_test VALUES {}".format(values2))
assert (
@ -269,6 +298,30 @@ def test_alter_table_columns(cluster, node_name):
"INSERT INTO s3_test VALUES {}".format(generate_values("2020-01-03", 4096, -1))
)
def assert_deleted_in_log(old_objects, new_objects):
node.query("SYSTEM FLUSH LOGS")
deleted_objects = set(obj.object_name for obj in old_objects) - set(
obj.object_name for obj in new_objects
)
deleted_in_log = set(
node.query(
f"SELECT remote_path FROM system.blob_storage_log WHERE error == '' AND event_type == 'Delete'"
)
.strip()
.split()
)
# all deleted objects should be in log
assert all(obj in deleted_in_log for obj in deleted_objects), (
deleted_objects,
node.query(
f"SELECT * FROM system.blob_storage_log FORMAT PrettyCompactMonoBlock"
),
)
objects_before = list_objects(cluster, "data/")
node.query("ALTER TABLE s3_test ADD COLUMN col1 UInt64 DEFAULT 1")
# To ensure parts have merged
node.query("OPTIMIZE TABLE s3_test")
@ -278,30 +331,42 @@ def test_alter_table_columns(cluster, node_name):
node.query("SELECT sum(col1) FROM s3_test WHERE id > 0 FORMAT Values")
== "(4096)"
)
wait_for_delete_s3_objects(
existing_objects = wait_for_delete_s3_objects(
cluster,
FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE + FILES_OVERHEAD_PER_COLUMN,
)
assert_deleted_in_log(objects_before, existing_objects)
objects_before = existing_objects
node.query(
"ALTER TABLE s3_test MODIFY COLUMN col1 String", settings={"mutations_sync": 2}
)
assert node.query("SELECT distinct(col1) FROM s3_test FORMAT Values") == "('1')"
# and file with mutation
wait_for_delete_s3_objects(
existing_objects = wait_for_delete_s3_objects(
cluster,
FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE + FILES_OVERHEAD_PER_COLUMN + 1,
)
assert_deleted_in_log(objects_before, existing_objects)
objects_before = existing_objects
node.query("ALTER TABLE s3_test DROP COLUMN col1", settings={"mutations_sync": 2})
# and 2 files with mutations
wait_for_delete_s3_objects(
existing_objects = wait_for_delete_s3_objects(
cluster, FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE + 2
)
assert_deleted_in_log(objects_before, existing_objects)
objects_before = existing_objects
check_no_objects_after_drop(cluster)
existing_objects = check_no_objects_after_drop(cluster)
assert_deleted_in_log(objects_before, existing_objects)
objects_before = existing_objects
@pytest.mark.parametrize("node_name", ["node"])
@ -796,6 +861,18 @@ def test_merge_canceled_by_s3_errors(cluster, broken_s3, node_name, storage_poli
node.wait_for_log_line("ExpectedError Message: mock s3 injected error")
table_uuid = node.query(
"SELECT uuid FROM system.tables WHERE database = 'default' AND name = 'test_merge_canceled_by_s3_errors' LIMIT 1"
).strip()
node.query("SYSTEM FLUSH LOGS")
error_count_in_blob_log = node.query(
f"SELECT count() FROM system.blob_storage_log WHERE query_id like '{table_uuid}::%' AND error like '%mock s3 injected error%'"
).strip()
assert int(error_count_in_blob_log) > 0, node.query(
f"SELECT * FROM system.blob_storage_log WHERE query_id like '{table_uuid}::%' FORMAT PrettyCompactMonoBlock"
)
check_no_objects_after_drop(
cluster, table_name="test_merge_canceled_by_s3_errors", node_name=node_name
)

View File

@ -110,7 +110,7 @@ def rabbitmq_setup_teardown():
],
)
def test_rabbitmq_select(rabbitmq_cluster, secure):
if secure and instance.is_built_with_memory_sanitizer():
if secure and instance.is_built_with_thread_sanitizer():
pytest.skip(
"Data races: see https://github.com/ClickHouse/ClickHouse/issues/56866"
)

View File

@ -0,0 +1,9 @@
<clickhouse>
<blob_storage_log>
<database>system</database>
<table>blob_storage_log</table>
<partition_by>toYYYYMM(event_date)</partition_by>
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
<ttl>event_date + INTERVAL 30 DAY</ttl>
</blob_storage_log>
</clickhouse>

View File

@ -1,5 +1,5 @@
import gzip
import json
import uuid
import logging
import os
import io
@ -54,6 +54,7 @@ def started_cluster():
"configs/defaultS3.xml",
"configs/named_collections.xml",
"configs/schema_cache.xml",
"configs/blob_log.xml",
],
user_configs=[
"configs/access.xml",
@ -104,11 +105,9 @@ def started_cluster():
cluster.shutdown()
def run_query(instance, query, stdin=None, settings=None):
# type: (ClickHouseInstance, str, object, dict) -> str
def run_query(instance, query, *args, **kwargs):
logging.info("Running query '{}'...".format(query))
result = instance.query(query, stdin=stdin, settings=settings)
result = instance.query(query, *args, **kwargs)
logging.info("Query finished")
return result
@ -129,7 +128,7 @@ def run_query(instance, query, stdin=None, settings=None):
],
)
def test_put(started_cluster, maybe_auth, positive, compression):
# type: (ClickHouseCluster) -> None
# type: (ClickHouseCluster, str, bool, str) -> None
bucket = (
started_cluster.minio_bucket
@ -496,7 +495,7 @@ def test_put_get_with_globs(started_cluster):
],
)
def test_multipart(started_cluster, maybe_auth, positive):
# type: (ClickHouseCluster) -> None
# type: (ClickHouseCluster, str, bool) -> None
bucket = (
started_cluster.minio_bucket
@ -529,7 +528,7 @@ def test_multipart(started_cluster, maybe_auth, positive):
maybe_auth,
table_format,
)
put_query_id = uuid.uuid4().hex
try:
run_query(
instance,
@ -539,6 +538,7 @@ def test_multipart(started_cluster, maybe_auth, positive):
"s3_min_upload_part_size": min_part_size_bytes,
"s3_max_single_part_upload_size": 0,
},
query_id=put_query_id,
)
except helpers.client.QueryRuntimeException:
if positive:
@ -583,6 +583,24 @@ def test_multipart(started_cluster, maybe_auth, positive):
== "\t".join(map(str, [total_rows, total_rows * 2, total_rows * 3])) + "\n"
)
if positive:
instance.query("SYSTEM FLUSH LOGS")
blob_storage_log = instance.query(f"SELECT * FROM system.blob_storage_log")
result = instance.query(
f"""SELECT
countIf(event_type == 'MultiPartUploadCreate'),
countIf(event_type == 'MultiPartUploadWrite'),
countIf(event_type == 'MultiPartUploadComplete'),
count()
FROM system.blob_storage_log WHERE query_id = '{put_query_id}'"""
)
r = result.strip().split("\t")
assert int(r[0]) == 1, blob_storage_log
assert int(r[1]) >= 1, blob_storage_log
assert int(r[2]) == 1, blob_storage_log
assert int(r[0]) + int(r[1]) + int(r[2]) == int(r[3]), blob_storage_log
def test_remote_host_filter(started_cluster):
instance = started_cluster.instances["restricted_dummy"]
@ -855,14 +873,34 @@ def test_storage_s3_put_uncompressed(started_cluster):
name, started_cluster.minio_ip, MINIO_INTERNAL_PORT, bucket, filename
),
)
run_query(instance, "INSERT INTO {} VALUES ({})".format(name, "),(".join(data)))
insert_query_id = uuid.uuid4().hex
data_sep = "),("
run_query(
instance,
"INSERT INTO {} VALUES ({})".format(name, data_sep.join(data)),
query_id=insert_query_id,
)
run_query(instance, "SELECT sum(id) FROM {}".format(name)).splitlines() == ["753"]
uncompressed_content = get_s3_file_content(started_cluster, bucket, filename)
assert sum([int(i.split(",")[1]) for i in uncompressed_content.splitlines()]) == 753
instance.query("SYSTEM FLUSH LOGS")
blob_storage_log = instance.query(f"SELECT * FROM system.blob_storage_log")
result = instance.query(
f"""SELECT
countIf(event_type == 'Upload'),
countIf(remote_path == '{filename}'),
countIf(bucket == '{bucket}'),
count()
FROM system.blob_storage_log WHERE query_id = '{insert_query_id}'"""
)
r = result.strip().split("\t")
assert int(r[0]) >= 1, blob_storage_log
assert all(col == r[0] for col in r), blob_storage_log
@pytest.mark.parametrize(
"extension,method",

View File

@ -64,4 +64,11 @@ Three arguments test
42144255
42144
42144255
-- Single argument tests
42
42
foo
foo
\N
\N
Testing the alias

View File

@ -83,7 +83,14 @@ SELECT concat(materialize(42 :: Int32), materialize(144 :: UInt64), materialize(
SELECT concat(42, 144);
SELECT concat(42, 144, 255);
SELECT '-- Single argument tests';
SELECT concat(42);
SELECT concat(materialize(42));
SELECT concat('foo');
SELECT concat(materialize('foo'));
SELECT concat(NULL);
SELECT concat(materialize(NULL :: Nullable(UInt64)));
SELECT CONCAT('Testing the ', 'alias');
SELECT concat(); -- { serverError 42 }
SELECT concat(1); -- { serverError 42 }

View File

@ -0,0 +1,12 @@
CreateQuery numbers_pv (children 2)
Identifier numbers_pv
SelectWithUnionQuery (children 1)
ExpressionList (children 1)
SelectQuery (children 3)
ExpressionList (children 1)
Asterisk
TablesInSelectQuery (children 1)
TablesInSelectQueryElement (children 1)
TableExpression (children 1)
TableIdentifier numbers
QueryParameter amount:UInt8

View File

@ -0,0 +1,3 @@
EXPLAIN AST
CREATE VIEW numbers_pv AS
SELECT * FROM numbers LIMIT {amount:UInt8};

View File

@ -0,0 +1,25 @@
-- { echoOn }
SELECT * FROM t1 JOIN t2 ON (t1.x <=> t2.x OR (t1.x IS NULL AND t2.x IS NULL)) ORDER BY t1.x NULLS LAST;
2 2 2 2
3 3 3 33
\N \N \N \N
SELECT * FROM t1 JOIN t2 ON (t1.x <=> t2.x OR t1.x IS NULL AND t1.y <=> t2.y AND t2.x IS NULL) ORDER BY t1.x NULLS LAST;
1 42 4 42
2 2 2 2
3 3 3 33
\N \N \N \N
SELECT * FROM t1 JOIN t2 ON (t1.x = t2.x OR t1.x IS NULL AND t2.x IS NULL) AND t1.y <=> t2.y ORDER BY t1.x NULLS LAST;
2 2 2 2
\N \N \N \N
SELECT * FROM t1 JOIN t2 ON (t1.x <=> t2.x OR t1.y <=> t2.y OR (t1.x IS NULL AND t1.y IS NULL AND t2.x IS NULL AND t2.y IS NULL)) ORDER BY t1.x NULLS LAST;
1 42 4 42
2 2 2 2
3 3 3 33
\N \N \N \N
SELECT * FROM t1 JOIN t2 ON (t1.x <=> t2.x OR (t1.x IS NULL AND t2.x IS NULL)) AND (t1.y == t2.y OR (t1.y IS NULL AND t2.y IS NULL)) AND COALESCE(t1.x, 0) != 2 ORDER BY t1.x NULLS LAST;
\N \N \N \N
SELECT x = y OR (x IS NULL AND y IS NULL) FROM t1 ORDER BY x NULLS LAST;
0
1
1
1

View File

@ -0,0 +1,27 @@
DROP TABLE IF EXISTS t1;
DROP TABLE IF EXISTS t2;
CREATE TABLE t1 (x Nullable(Int64), y Nullable(UInt64)) ENGINE = TinyLog;
CREATE TABLE t2 (x Nullable(Int64), y Nullable(UInt64)) ENGINE = TinyLog;
INSERT INTO t1 VALUES (1,42), (2,2), (3,3), (NULL,NULL);
INSERT INTO t2 VALUES (NULL,NULL), (2,2), (3,33), (4,42);
SET allow_experimental_analyzer = 1;
-- { echoOn }
SELECT * FROM t1 JOIN t2 ON (t1.x <=> t2.x OR (t1.x IS NULL AND t2.x IS NULL)) ORDER BY t1.x NULLS LAST;
SELECT * FROM t1 JOIN t2 ON (t1.x <=> t2.x OR t1.x IS NULL AND t1.y <=> t2.y AND t2.x IS NULL) ORDER BY t1.x NULLS LAST;
SELECT * FROM t1 JOIN t2 ON (t1.x = t2.x OR t1.x IS NULL AND t2.x IS NULL) AND t1.y <=> t2.y ORDER BY t1.x NULLS LAST;
SELECT * FROM t1 JOIN t2 ON (t1.x <=> t2.x OR t1.y <=> t2.y OR (t1.x IS NULL AND t1.y IS NULL AND t2.x IS NULL AND t2.y IS NULL)) ORDER BY t1.x NULLS LAST;
SELECT * FROM t1 JOIN t2 ON (t1.x <=> t2.x OR (t1.x IS NULL AND t2.x IS NULL)) AND (t1.y == t2.y OR (t1.y IS NULL AND t2.y IS NULL)) AND COALESCE(t1.x, 0) != 2 ORDER BY t1.x NULLS LAST;
SELECT x = y OR (x IS NULL AND y IS NULL) FROM t1 ORDER BY x NULLS LAST;
-- { echoOff }
DROP TABLE IF EXISTS t1;
DROP TABLE IF EXISTS t2;

View File

@ -0,0 +1,4 @@
1 2504 1
ok
1 200 1 1
ok

View File

@ -0,0 +1,57 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
EXCEPTION_TEXT="VIOLATED_CONSTRAINT"
EXCEPTION_SUCCESS_TEXT=ok
# CollapsingSortedAlgorithm::merge() also has a check for sign column value
# optimize_on_insert = 0 is required to avoid this automatic merge behavior
$CLICKHOUSE_CLIENT --query="SET optimize_on_insert=0;"
# CollapsingMergeTree
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS collapsing_merge_tree;"
$CLICKHOUSE_CLIENT --query="CREATE TABLE collapsing_merge_tree
(
Key UInt32,
Count UInt16,
Sign Int8
)
ENGINE=CollapsingMergeTree(Sign) ORDER BY Key
SETTINGS add_implicit_sign_column_constraint_for_collapsing_engine=1;"
# Should succeed
$CLICKHOUSE_CLIENT --query="INSERT INTO collapsing_merge_tree VALUES (1, 2504, 1);"
$CLICKHOUSE_CLIENT --query="SELECT * FROM collapsing_merge_tree;"
# Should throw an exception
$CLICKHOUSE_CLIENT --query="INSERT INTO collapsing_merge_tree VALUES (1, 2504, 5);" 2>&1 \
| grep -q "$EXCEPTION_TEXT" && echo "$EXCEPTION_SUCCESS_TEXT" || echo "Did not throw an exception"
$CLICKHOUSE_CLIENT --query="DROP TABLE collapsing_merge_tree;"
# VersionedCollapsingMergeTree
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS versioned_collapsing_merge_tree;"
$CLICKHOUSE_CLIENT --query="CREATE TABLE versioned_collapsing_merge_tree
(
Key UInt32,
Count UInt8,
Sign Int8,
Version UInt8
)
ENGINE=VersionedCollapsingMergeTree(Sign, Version) ORDER BY Key
SETTINGS add_implicit_sign_column_constraint_for_collapsing_engine=1;"
# Should succeed
$CLICKHOUSE_CLIENT --query="INSERT INTO versioned_collapsing_merge_tree VALUES (1, 2504, 1, 1);"
$CLICKHOUSE_CLIENT --query="SELECT * FROM versioned_collapsing_merge_tree;"
# Should throw an exception
$CLICKHOUSE_CLIENT --query="INSERT INTO versioned_collapsing_merge_tree VALUES (1, 2504, 5, 1);" 2>&1 \
| grep -q "$EXCEPTION_TEXT" && echo "$EXCEPTION_SUCCESS_TEXT" || echo "Did not throw an exception"
$CLICKHOUSE_CLIENT --query="DROP TABLE versioned_collapsing_merge_tree;"