mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-24 08:32:02 +00:00
Merge pull request #64393 from ClickHouse/vdimir/blob_storage_log_fix_recursion
Prevent recursive logging in blob_storage_log
This commit is contained in:
commit
5251febf03
@ -891,6 +891,7 @@ class IColumn;
|
|||||||
M(Bool, geo_distance_returns_float64_on_float64_arguments, true, "If all four arguments to `geoDistance`, `greatCircleDistance`, `greatCircleAngle` functions are Float64, return Float64 and use double precision for internal calculations. In previous ClickHouse versions, the functions always returned Float32.", 0) \
|
M(Bool, geo_distance_returns_float64_on_float64_arguments, true, "If all four arguments to `geoDistance`, `greatCircleDistance`, `greatCircleAngle` functions are Float64, return Float64 and use double precision for internal calculations. In previous ClickHouse versions, the functions always returned Float32.", 0) \
|
||||||
M(Bool, allow_get_client_http_header, false, "Allow to use the function `getClientHTTPHeader` which lets to obtain a value of an the current HTTP request's header. It is not enabled by default for security reasons, because some headers, such as `Cookie`, could contain sensitive info. Note that the `X-ClickHouse-*` and `Authentication` headers are always restricted and cannot be obtained with this function.", 0) \
|
M(Bool, allow_get_client_http_header, false, "Allow to use the function `getClientHTTPHeader` which lets to obtain a value of an the current HTTP request's header. It is not enabled by default for security reasons, because some headers, such as `Cookie`, could contain sensitive info. Note that the `X-ClickHouse-*` and `Authentication` headers are always restricted and cannot be obtained with this function.", 0) \
|
||||||
M(Bool, cast_string_to_dynamic_use_inference, false, "Use types inference during String to Dynamic conversion", 0) \
|
M(Bool, cast_string_to_dynamic_use_inference, false, "Use types inference during String to Dynamic conversion", 0) \
|
||||||
|
M(Bool, enable_blob_storage_log, true, "Write information about blob storage operations to system.blob_storage_log table", 0) \
|
||||||
\
|
\
|
||||||
/** Experimental functions */ \
|
/** Experimental functions */ \
|
||||||
M(Bool, allow_experimental_materialized_postgresql_table, false, "Allows to use the MaterializedPostgreSQL table engine. Disabled by default, because this feature is experimental", 0) \
|
M(Bool, allow_experimental_materialized_postgresql_table, false, "Allows to use the MaterializedPostgreSQL table engine. Disabled by default, because this feature is experimental", 0) \
|
||||||
|
@ -96,6 +96,7 @@ static std::map<ClickHouseVersion, SettingsChangesHistory::SettingsChanges> sett
|
|||||||
{"hdfs_ignore_file_doesnt_exist", false, false, "Allow to return 0 rows when the requested files don't exist instead of throwing an exception in HDFS table engine"},
|
{"hdfs_ignore_file_doesnt_exist", false, false, "Allow to return 0 rows when the requested files don't exist instead of throwing an exception in HDFS table engine"},
|
||||||
{"azure_ignore_file_doesnt_exist", false, false, "Allow to return 0 rows when the requested files don't exist instead of throwing an exception in AzureBlobStorage table engine"},
|
{"azure_ignore_file_doesnt_exist", false, false, "Allow to return 0 rows when the requested files don't exist instead of throwing an exception in AzureBlobStorage table engine"},
|
||||||
{"s3_ignore_file_doesnt_exist", false, false, "Allow to return 0 rows when the requested files don't exist instead of throwing an exception in S3 table engine"},
|
{"s3_ignore_file_doesnt_exist", false, false, "Allow to return 0 rows when the requested files don't exist instead of throwing an exception in S3 table engine"},
|
||||||
|
{"enable_blob_storage_log", true, true, "Write information about blob storage operations to system.blob_storage_log table"},
|
||||||
}},
|
}},
|
||||||
{"24.5", {{"allow_deprecated_error_prone_window_functions", true, false, "Allow usage of deprecated error prone window functions (neighbor, runningAccumulate, runningDifferenceStartingWithFirstValue, runningDifference)"},
|
{"24.5", {{"allow_deprecated_error_prone_window_functions", true, false, "Allow usage of deprecated error prone window functions (neighbor, runningAccumulate, runningDifferenceStartingWithFirstValue, runningDifference)"},
|
||||||
{"allow_experimental_join_condition", false, false, "Support join with inequal conditions which involve columns from both left and right table. e.g. t1.y < t2.y."},
|
{"allow_experimental_join_condition", false, false, "Support join with inequal conditions which involve columns from both left and right table. e.g. t1.y < t2.y."},
|
||||||
|
@ -23,6 +23,9 @@ void BlobStorageLogWriter::addEvent(
|
|||||||
if (!log)
|
if (!log)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
|
if (log->shouldIgnorePath(local_path_.empty() ? local_path : local_path_))
|
||||||
|
return;
|
||||||
|
|
||||||
if (!time_now.time_since_epoch().count())
|
if (!time_now.time_since_epoch().count())
|
||||||
time_now = std::chrono::system_clock::now();
|
time_now = std::chrono::system_clock::now();
|
||||||
|
|
||||||
|
@ -9,6 +9,8 @@
|
|||||||
#include <DataTypes/DataTypeLowCardinality.h>
|
#include <DataTypes/DataTypeLowCardinality.h>
|
||||||
#include <DataTypes/DataTypeDate.h>
|
#include <DataTypes/DataTypeDate.h>
|
||||||
|
|
||||||
|
#include <Storages/IStorage.h>
|
||||||
|
#include <Storages/MergeTree/MergeTreeData.h>
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
@ -69,4 +71,32 @@ void BlobStorageLogElement::appendToBlock(MutableColumns & columns) const
|
|||||||
columns[i++]->insert(error_message);
|
columns[i++]->insert(error_message);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void BlobStorageLog::addSettingsForQuery(ContextMutablePtr & mutable_context, IAST::QueryKind query_kind) const
|
||||||
|
{
|
||||||
|
SystemLog<BlobStorageLogElement>::addSettingsForQuery(mutable_context, query_kind);
|
||||||
|
|
||||||
|
if (query_kind == IAST::QueryKind::Insert)
|
||||||
|
mutable_context->setSetting("enable_blob_storage_log", false);
|
||||||
|
}
|
||||||
|
|
||||||
|
static std::string_view normalizePath(std::string_view path)
|
||||||
|
{
|
||||||
|
if (path.starts_with("./"))
|
||||||
|
path.remove_prefix(2);
|
||||||
|
if (path.ends_with("/"))
|
||||||
|
path.remove_suffix(1);
|
||||||
|
return path;
|
||||||
|
}
|
||||||
|
|
||||||
|
void BlobStorageLog::prepareTable()
|
||||||
|
{
|
||||||
|
SystemLog<BlobStorageLogElement>::prepareTable();
|
||||||
|
if (auto merge_tree_table = std::dynamic_pointer_cast<MergeTreeData>(getStorage()))
|
||||||
|
{
|
||||||
|
std::unique_lock lock{prepare_mutex};
|
||||||
|
const auto & relative_data_path = merge_tree_table->getRelativeDataPath();
|
||||||
|
prefix_to_ignore = normalizePath(relative_data_path);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -1,11 +1,14 @@
|
|||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
#include <Interpreters/SystemLog.h>
|
|
||||||
#include <Core/NamesAndTypes.h>
|
|
||||||
#include <Core/NamesAndAliases.h>
|
|
||||||
#include <Poco/Message.h>
|
|
||||||
#include <Storages/ColumnsDescription.h>
|
|
||||||
#include <chrono>
|
#include <chrono>
|
||||||
|
#include <shared_mutex>
|
||||||
|
|
||||||
|
#include <Poco/Message.h>
|
||||||
|
|
||||||
|
#include <Core/NamesAndAliases.h>
|
||||||
|
#include <Core/NamesAndTypes.h>
|
||||||
|
#include <Interpreters/SystemLog.h>
|
||||||
|
#include <Storages/ColumnsDescription.h>
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
@ -51,7 +54,23 @@ struct BlobStorageLogElement
|
|||||||
|
|
||||||
class BlobStorageLog : public SystemLog<BlobStorageLogElement>
|
class BlobStorageLog : public SystemLog<BlobStorageLogElement>
|
||||||
{
|
{
|
||||||
|
public:
|
||||||
using SystemLog<BlobStorageLogElement>::SystemLog;
|
using SystemLog<BlobStorageLogElement>::SystemLog;
|
||||||
|
|
||||||
|
/// We should not log events for table itself to avoid infinite recursion
|
||||||
|
bool shouldIgnorePath(const String & path) const
|
||||||
|
{
|
||||||
|
std::shared_lock lock{prepare_mutex};
|
||||||
|
return !prefix_to_ignore.empty() && path.starts_with(prefix_to_ignore);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected:
|
||||||
|
void prepareTable() override;
|
||||||
|
void addSettingsForQuery(ContextMutablePtr & mutable_context, IAST::QueryKind query_kind) const override;
|
||||||
|
|
||||||
|
private:
|
||||||
|
mutable std::shared_mutex prepare_mutex;
|
||||||
|
String prefix_to_ignore;
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -4103,6 +4103,13 @@ std::shared_ptr<BackupLog> Context::getBackupLog() const
|
|||||||
|
|
||||||
std::shared_ptr<BlobStorageLog> Context::getBlobStorageLog() const
|
std::shared_ptr<BlobStorageLog> Context::getBlobStorageLog() const
|
||||||
{
|
{
|
||||||
|
bool enable_blob_storage_log = settings.enable_blob_storage_log;
|
||||||
|
if (hasQueryContext())
|
||||||
|
enable_blob_storage_log = getQueryContext()->getSettingsRef().enable_blob_storage_log;
|
||||||
|
|
||||||
|
if (!enable_blob_storage_log)
|
||||||
|
return {};
|
||||||
|
|
||||||
SharedLockGuard lock(shared->mutex);
|
SharedLockGuard lock(shared->mutex);
|
||||||
|
|
||||||
if (!shared->system_logs)
|
if (!shared->system_logs)
|
||||||
|
@ -519,8 +519,7 @@ void SystemLog<LogElement>::flushImpl(const std::vector<LogElement> & to_flush,
|
|||||||
// we need query context to do inserts to target table with MV containing subqueries or joins
|
// we need query context to do inserts to target table with MV containing subqueries or joins
|
||||||
auto insert_context = Context::createCopy(context);
|
auto insert_context = Context::createCopy(context);
|
||||||
insert_context->makeQueryContext();
|
insert_context->makeQueryContext();
|
||||||
/// We always want to deliver the data to the original table regardless of the MVs
|
addSettingsForQuery(insert_context, IAST::QueryKind::Insert);
|
||||||
insert_context->setSetting("materialized_views_ignore_errors", true);
|
|
||||||
|
|
||||||
InterpreterInsertQuery interpreter(query_ptr, insert_context);
|
InterpreterInsertQuery interpreter(query_ptr, insert_context);
|
||||||
BlockIO io = interpreter.execute();
|
BlockIO io = interpreter.execute();
|
||||||
@ -541,13 +540,18 @@ void SystemLog<LogElement>::flushImpl(const std::vector<LogElement> & to_flush,
|
|||||||
LOG_TRACE(log, "Flushed system log up to offset {}", to_flush_end);
|
LOG_TRACE(log, "Flushed system log up to offset {}", to_flush_end);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
template <typename LogElement>
|
||||||
|
StoragePtr SystemLog<LogElement>::getStorage() const
|
||||||
|
{
|
||||||
|
return DatabaseCatalog::instance().tryGetTable(table_id, getContext());
|
||||||
|
}
|
||||||
|
|
||||||
template <typename LogElement>
|
template <typename LogElement>
|
||||||
void SystemLog<LogElement>::prepareTable()
|
void SystemLog<LogElement>::prepareTable()
|
||||||
{
|
{
|
||||||
String description = table_id.getNameForLogs();
|
String description = table_id.getNameForLogs();
|
||||||
|
|
||||||
auto table = DatabaseCatalog::instance().tryGetTable(table_id, getContext());
|
auto table = getStorage();
|
||||||
if (table)
|
if (table)
|
||||||
{
|
{
|
||||||
if (old_create_query.empty())
|
if (old_create_query.empty())
|
||||||
@ -596,10 +600,9 @@ void SystemLog<LogElement>::prepareTable()
|
|||||||
merges_lock = table->getActionLock(ActionLocks::PartsMerge);
|
merges_lock = table->getActionLock(ActionLocks::PartsMerge);
|
||||||
|
|
||||||
auto query_context = Context::createCopy(context);
|
auto query_context = Context::createCopy(context);
|
||||||
/// As this operation is performed automatically we don't want it to fail because of user dependencies on log tables
|
|
||||||
query_context->setSetting("check_table_dependencies", Field{false});
|
|
||||||
query_context->setSetting("check_referential_table_dependencies", Field{false});
|
|
||||||
query_context->makeQueryContext();
|
query_context->makeQueryContext();
|
||||||
|
addSettingsForQuery(query_context, IAST::QueryKind::Rename);
|
||||||
|
|
||||||
InterpreterRenameQuery(rename, query_context).execute();
|
InterpreterRenameQuery(rename, query_context).execute();
|
||||||
|
|
||||||
/// The required table will be created.
|
/// The required table will be created.
|
||||||
@ -616,6 +619,7 @@ void SystemLog<LogElement>::prepareTable()
|
|||||||
|
|
||||||
auto query_context = Context::createCopy(context);
|
auto query_context = Context::createCopy(context);
|
||||||
query_context->makeQueryContext();
|
query_context->makeQueryContext();
|
||||||
|
addSettingsForQuery(query_context, IAST::QueryKind::Create);
|
||||||
|
|
||||||
auto create_query_ast = getCreateTableQuery();
|
auto create_query_ast = getCreateTableQuery();
|
||||||
InterpreterCreateQuery interpreter(create_query_ast, query_context);
|
InterpreterCreateQuery interpreter(create_query_ast, query_context);
|
||||||
@ -630,6 +634,22 @@ void SystemLog<LogElement>::prepareTable()
|
|||||||
is_prepared = true;
|
is_prepared = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
template <typename LogElement>
|
||||||
|
void SystemLog<LogElement>::addSettingsForQuery(ContextMutablePtr & mutable_context, IAST::QueryKind query_kind) const
|
||||||
|
{
|
||||||
|
if (query_kind == IAST::QueryKind::Insert)
|
||||||
|
{
|
||||||
|
/// We always want to deliver the data to the original table regardless of the MVs
|
||||||
|
mutable_context->setSetting("materialized_views_ignore_errors", true);
|
||||||
|
}
|
||||||
|
else if (query_kind == IAST::QueryKind::Rename)
|
||||||
|
{
|
||||||
|
/// As this operation is performed automatically we don't want it to fail because of user dependencies on log tables
|
||||||
|
mutable_context->setSetting("check_table_dependencies", Field{false});
|
||||||
|
mutable_context->setSetting("check_referential_table_dependencies", Field{false});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
template <typename LogElement>
|
template <typename LogElement>
|
||||||
ASTPtr SystemLog<LogElement>::getCreateTableQuery()
|
ASTPtr SystemLog<LogElement>::getCreateTableQuery()
|
||||||
{
|
{
|
||||||
|
@ -2,6 +2,7 @@
|
|||||||
|
|
||||||
#include <Interpreters/StorageID.h>
|
#include <Interpreters/StorageID.h>
|
||||||
#include <Common/SystemLogBase.h>
|
#include <Common/SystemLogBase.h>
|
||||||
|
#include <Parsers/IAST.h>
|
||||||
|
|
||||||
#include <boost/noncopyable.hpp>
|
#include <boost/noncopyable.hpp>
|
||||||
|
|
||||||
@ -139,6 +140,17 @@ protected:
|
|||||||
using ISystemLog::thread_mutex;
|
using ISystemLog::thread_mutex;
|
||||||
using Base::queue;
|
using Base::queue;
|
||||||
|
|
||||||
|
StoragePtr getStorage() const;
|
||||||
|
|
||||||
|
/** Creates new table if it does not exist.
|
||||||
|
* Renames old table if its structure is not suitable.
|
||||||
|
* This cannot be done in constructor to avoid deadlock while renaming a table under locked Context when SystemLog object is created.
|
||||||
|
*/
|
||||||
|
void prepareTable() override;
|
||||||
|
|
||||||
|
/// Some tables can override settings for internal queries
|
||||||
|
virtual void addSettingsForQuery(ContextMutablePtr & mutable_context, IAST::QueryKind query_kind) const;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
/* Saving thread data */
|
/* Saving thread data */
|
||||||
const StorageID table_id;
|
const StorageID table_id;
|
||||||
@ -147,12 +159,6 @@ private:
|
|||||||
String old_create_query;
|
String old_create_query;
|
||||||
bool is_prepared = false;
|
bool is_prepared = false;
|
||||||
|
|
||||||
/** Creates new table if it does not exist.
|
|
||||||
* Renames old table if its structure is not suitable.
|
|
||||||
* This cannot be done in constructor to avoid deadlock while renaming a table under locked Context when SystemLog object is created.
|
|
||||||
*/
|
|
||||||
void prepareTable() override;
|
|
||||||
|
|
||||||
void savingThreadFunction() override;
|
void savingThreadFunction() override;
|
||||||
|
|
||||||
/// flushImpl can be executed only in saving_thread.
|
/// flushImpl can be executed only in saving_thread.
|
||||||
|
Loading…
Reference in New Issue
Block a user