Merge branch 'master' into urlCluster

This commit is contained in:
Kruglov Pavel 2023-05-16 16:21:23 +02:00 committed by GitHub
commit 4530f38fdf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
89 changed files with 3717 additions and 806 deletions

View File

@ -103,11 +103,19 @@ set (SRCS_CONTEXT
)
if (ARCH_AARCH64)
if (OS_DARWIN)
set (SRCS_CONTEXT ${SRCS_CONTEXT}
"${LIBRARY_DIR}/libs/context/src/asm/jump_arm64_aapcs_macho_gas.S"
"${LIBRARY_DIR}/libs/context/src/asm/make_arm64_aapcs_macho_gas.S"
"${LIBRARY_DIR}/libs/context/src/asm/ontop_arm64_aapcs_macho_gas.S"
)
else()
set (SRCS_CONTEXT ${SRCS_CONTEXT}
"${LIBRARY_DIR}/libs/context/src/asm/jump_arm64_aapcs_elf_gas.S"
"${LIBRARY_DIR}/libs/context/src/asm/make_arm64_aapcs_elf_gas.S"
"${LIBRARY_DIR}/libs/context/src/asm/ontop_arm64_aapcs_elf_gas.S"
)
endif()
elseif (ARCH_PPC64LE)
set (SRCS_CONTEXT ${SRCS_CONTEXT}
"${LIBRARY_DIR}/libs/context/src/asm/jump_ppc64_sysv_elf_gas.S"

View File

@ -1,15 +1,30 @@
set (SRC_DIR "${ClickHouse_SOURCE_DIR}/contrib/googletest/googletest")
set (SRC_DIR "${ClickHouse_SOURCE_DIR}/contrib/googletest")
add_library(_gtest "${SRC_DIR}/src/gtest-all.cc")
add_library(_gtest "${SRC_DIR}/googletest/src/gtest-all.cc")
set_target_properties(_gtest PROPERTIES VERSION "1.0.0")
target_compile_definitions (_gtest PUBLIC GTEST_HAS_POSIX_RE=0)
target_include_directories(_gtest SYSTEM PUBLIC "${SRC_DIR}/include")
target_include_directories(_gtest PRIVATE "${SRC_DIR}")
target_include_directories(_gtest SYSTEM PUBLIC "${SRC_DIR}/googletest/include")
target_include_directories(_gtest PRIVATE "${SRC_DIR}/googletest")
add_library(_gtest_main "${SRC_DIR}/src/gtest_main.cc")
add_library(_gtest_main "${SRC_DIR}/googletest/src/gtest_main.cc")
set_target_properties(_gtest_main PROPERTIES VERSION "1.0.0")
target_link_libraries(_gtest_main PUBLIC _gtest)
add_library(_gtest_all INTERFACE)
target_link_libraries(_gtest_all INTERFACE _gtest _gtest_main)
add_library(ch_contrib::gtest_all ALIAS _gtest_all)
add_library(_gmock "${SRC_DIR}/googlemock/src/gmock-all.cc")
set_target_properties(_gmock PROPERTIES VERSION "1.0.0")
target_compile_definitions (_gmock PUBLIC GTEST_HAS_POSIX_RE=0)
target_include_directories(_gmock SYSTEM PUBLIC "${SRC_DIR}/googlemock/include" "${SRC_DIR}/googletest/include")
target_include_directories(_gmock PRIVATE "${SRC_DIR}/googlemock")
add_library(_gmock_main "${SRC_DIR}/googlemock/src/gmock_main.cc")
set_target_properties(_gmock_main PROPERTIES VERSION "1.0.0")
target_link_libraries(_gmock_main PUBLIC _gmock)
add_library(_gmock_all INTERFACE)
target_link_libraries(_gmock_all INTERFACE _gmock _gmock_main)
add_library(ch_contrib::gmock_all ALIAS _gmock_all)

View File

@ -19,8 +19,8 @@ Kafka lets you:
``` sql
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
name1 [type1],
name2 [type2],
name1 [type1] [ALIAS expr1],
name2 [type2] [ALIAS expr2],
...
) ENGINE = Kafka()
SETTINGS

View File

@ -172,7 +172,9 @@ Example of configuration for versions earlier than 22.8:
</storage_configuration>
```
Cache **configuration settings**:
File Cache **disk configuration settings**:
These settings should be defined in the disk configuration section.
- `path` - path to the directory with cache. Default: None, this setting is obligatory.
@ -182,7 +184,7 @@ Cache **configuration settings**:
- `enable_filesystem_query_cache_limit` - allow to limit the size of cache which is downloaded within each query (depends on user setting `max_query_cache_size`). Default: `false`.
- `enable_cache_hits_threshold` - a number, which defines how many times some data needs to be read before it will be cached. Default: `0`, e.g. the data is cached at the first attempt to read it.
- `enable_cache_hits_threshold` - number which defines how many times some data needs to be read before it will be cached. Default: `0`, e.g. the data is cached at the first attempt to read it.
- `do_not_evict_index_and_mark_files` - do not evict small frequently used files according to cache policy. Default: `false`. This setting was added in version 22.8. If you used filesystem cache before this version, then it will not work on versions starting from 22.8 if this setting is set to `true`. If you want to use this setting, clear old cache created before version 22.8 before upgrading.
@ -190,21 +192,23 @@ Cache **configuration settings**:
- `max_elements` - a limit for a number of cache files. Default: `1048576`.
Cache **query settings**:
File Cache **query/profile settings**:
Some of these settings will disable cache features per query/profile that are enabled by default or in disk configuration settings. For example, you can enable cache in disk configuration and disable it per query/profile setting `enable_filesystem_cache` to `false`. Also setting `cache_on_write_operations` to `true` in disk configuration means that "write-though" cache is enabled. But if you need to disable this general setting per specific queries then setting `enable_filesystem_cache_on_write_operations` to `false` means that write operations cache will be disabled for a specific query/profile.
- `enable_filesystem_cache` - allows to disable cache per query even if storage policy was configured with `cache` disk type. Default: `true`.
- `read_from_filesystem_cache_if_exists_otherwise_bypass_cache` - allows to use cache in query only if it already exists, otherwise query data will not be written to local cache storage. Default: `false`.
- `enable_filesystem_cache_on_write_operations` - turn on `write-through` cache. This setting works only if setting `cache_on_write_operations` in cache configuration is turned on.
- `enable_filesystem_cache_on_write_operations` - turn on `write-through` cache. This setting works only if setting `cache_on_write_operations` in cache configuration is turned on. Default: `false`.
- `enable_filesystem_cache_log` - turn on logging to `system.filesystem_cache_log` table. Gives a detailed view of cache usage per query. Default: `false`.
- `enable_filesystem_cache_log` - turn on logging to `system.filesystem_cache_log` table. Gives a detailed view of cache usage per query. It can be turn on for specific queries or enabled in a profile. Default: `false`.
- `max_query_cache_size` - a limit for the cache size, which can be written to local cache storage. Requires enabled `enable_filesystem_query_cache_limit` in cache configuration. Default: `false`.
- `skip_download_if_exceeds_query_cache` - allows to change the behaviour of setting `max_query_cache_size`. Default: `true`. If this setting is turned on and cache download limit during query was reached, no more cache will be downloaded to cache storage. If this setting is turned off and cache download limit during query was reached, cache will still be written by cost of evicting previously downloaded (within current query) data, e.g. second behaviour allows to preserve `last recentltly used` behaviour while keeping query cache limit.
- `skip_download_if_exceeds_query_cache` - allows to change the behaviour of setting `max_query_cache_size`. Default: `true`. If this setting is turned on and cache download limit during query was reached, no more cache will be downloaded to cache storage. If this setting is turned off and cache download limit during query was reached, cache will still be written by cost of evicting previously downloaded (within current query) data, e.g. second behaviour allows to preserve `last recently used` behaviour while keeping query cache limit.
** Warning **
**Warning**
Cache configuration settings and cache query settings correspond to the latest ClickHouse version, for earlier versions something might not be supported.
Cache **system tables**:

View File

@ -12,7 +12,7 @@ Columns:
- `database` ([String](../../sql-reference/data-types/string.md)) — Database name.
- `table` ([String](../../sql-reference/data-types/string.md)) — Table name.
- `table` ([String](../../sql-reference/data-types/string.md)) — Table name. Empty if policy for database.
- `id` ([UUID](../../sql-reference/data-types/uuid.md)) — Row policy ID.

View File

@ -14,8 +14,8 @@ Row policies makes sense only for users with readonly access. If user can modify
Syntax:
``` sql
CREATE [ROW] POLICY [IF NOT EXISTS | OR REPLACE] policy_name1 [ON CLUSTER cluster_name1] ON [db1.]table1
[, policy_name2 [ON CLUSTER cluster_name2] ON [db2.]table2 ...]
CREATE [ROW] POLICY [IF NOT EXISTS | OR REPLACE] policy_name1 [ON CLUSTER cluster_name1] ON [db1.]table1|db1.*
[, policy_name2 [ON CLUSTER cluster_name2] ON [db2.]table2|db2.* ...]
[FOR SELECT] USING condition
[AS {PERMISSIVE | RESTRICTIVE}]
[TO {role1 [, role2 ...] | ALL | ALL EXCEPT role1 [, role2 ...]}]
@ -76,6 +76,20 @@ CREATE ROW POLICY pol2 ON mydb.table1 USING c=2 AS RESTRICTIVE TO peter, antonio
enables the user `peter` to see rows only if both `b=1` AND `c=2`.
Database policies are combined with table policies.
For example, the following policies
``` sql
CREATE ROW POLICY pol1 ON mydb.* USING b=1 TO mira, peter
CREATE ROW POLICY pol2 ON mydb.table1 USING c=2 AS RESTRICTIVE TO peter, antonio
```
enables the user `peter` to see table1 rows only if both `b=1` AND `c=2`, although
any other table in mydb would have only `b=1` policy applied for the user.
## ON CLUSTER Clause
Allows creating row policies on a cluster, see [Distributed DDL](../../../sql-reference/distributed-ddl.md).
@ -88,3 +102,5 @@ Allows creating row policies on a cluster, see [Distributed DDL](../../../sql-re
`CREATE ROW POLICY filter2 ON mydb.mytable USING a<1000 AND b=5 TO ALL EXCEPT mira`
`CREATE ROW POLICY filter3 ON mydb.mytable USING 1 TO admin`
`CREATE ROW POLICY filter4 ON mydb.* USING 1 TO admin`

View File

@ -1872,7 +1872,7 @@ try
}
if (current_connections)
LOG_INFO(log, "Closed all listening sockets. Waiting for {} outstanding connections.", current_connections);
LOG_WARNING(log, "Closed all listening sockets. Waiting for {} outstanding connections.", current_connections);
else
LOG_INFO(log, "Closed all listening sockets.");
@ -1884,7 +1884,7 @@ try
current_connections = waitServersToFinish(servers, config().getInt("shutdown_wait_unfinished", 5));
if (current_connections)
LOG_INFO(log, "Closed connections. But {} remain."
LOG_WARNING(log, "Closed connections. But {} remain."
" Tip: To increase wait time add to config: <shutdown_wait_unfinished>60</shutdown_wait_unfinished>", current_connections);
else
LOG_INFO(log, "Closed connections.");
@ -1900,7 +1900,7 @@ try
/// Dump coverage here, because std::atexit callback would not be called.
dumpCoverageReportIfPossible();
LOG_INFO(log, "Will shutdown forcefully.");
LOG_WARNING(log, "Will shutdown forcefully.");
safeExit(0);
}
});

View File

@ -22,7 +22,7 @@ String RowPolicyName::toString() const
name += backQuoteIfNeed(database);
name += '.';
}
name += backQuoteIfNeed(table_name);
name += (table_name == RowPolicyName::ANY_TABLE_MARK ? "*" : backQuoteIfNeed(table_name));
return name;
}

View File

@ -9,6 +9,8 @@ namespace DB
/// Represents the full name of a row policy, e.g. "myfilter ON mydb.mytable".
struct RowPolicyName
{
static constexpr char ANY_TABLE_MARK[] = "";
String short_name;
String database;
String table_name;

View File

@ -35,7 +35,13 @@ RowPolicyFilterPtr EnabledRowPolicies::getFilter(const String & database, const
auto loaded = mixed_filters.load();
auto it = loaded->find({database, table_name, filter_type});
if (it == loaded->end())
{ /// Look for a policy for database if a table policy not found
it = loaded->find({database, RowPolicyName::ANY_TABLE_MARK, filter_type});
if (it == loaded->end())
{
return {};
}
}
return it->second;
}

View File

@ -228,25 +228,25 @@ void RolesOrUsersSet::add(const std::vector<UUID> & ids_)
bool RolesOrUsersSet::match(const UUID & id) const
{
return (all || ids.count(id)) && !except_ids.count(id);
return (all || ids.contains(id)) && !except_ids.contains(id);
}
bool RolesOrUsersSet::match(const UUID & user_id, const boost::container::flat_set<UUID> & enabled_roles) const
{
if (!all && !ids.count(user_id))
if (!all && !ids.contains(user_id))
{
bool found_enabled_role = std::any_of(
enabled_roles.begin(), enabled_roles.end(), [this](const UUID & enabled_role) { return ids.count(enabled_role); });
enabled_roles.begin(), enabled_roles.end(), [this](const UUID & enabled_role) { return ids.contains(enabled_role); });
if (!found_enabled_role)
return false;
}
if (except_ids.count(user_id))
if (except_ids.contains(user_id))
return false;
bool in_except_list = std::any_of(
enabled_roles.begin(), enabled_roles.end(), [this](const UUID & enabled_role) { return except_ids.count(enabled_role); });
enabled_roles.begin(), enabled_roles.end(), [this](const UUID & enabled_role) { return except_ids.contains(enabled_role); });
return !in_except_list;
}

View File

@ -35,6 +35,9 @@ struct RowPolicy : public IAccessEntity
void setPermissive(bool permissive_ = true) { setRestrictive(!permissive_); }
bool isPermissive() const { return !isRestrictive(); }
/// Applied for entire database
bool isForDatabase() const { return full_name.table_name == RowPolicyName::ANY_TABLE_MARK; }
/// Sets that the policy is restrictive.
/// A row is only accessible if at least one of the permissive policies passes,
/// in addition to all the restrictive policies.

View File

@ -16,7 +16,8 @@ namespace DB
{
namespace
{
/// Accumulates filters from multiple row policies and joins them using the AND logical operation.
/// Helper to accumulate filters from multiple row policies and join them together
/// by AND or OR logical operations.
class FiltersMixer
{
public:
@ -148,9 +149,11 @@ void RowPolicyCache::ensureAllRowPoliciesRead()
for (const UUID & id : access_control.findAll<RowPolicy>())
{
auto quota = access_control.tryRead<RowPolicy>(id);
if (quota)
all_policies.emplace(id, PolicyInfo(quota));
auto policy = access_control.tryRead<RowPolicy>(id);
if (policy)
{
all_policies.emplace(id, PolicyInfo(policy));
}
}
}
@ -215,22 +218,26 @@ void RowPolicyCache::mixFiltersFor(EnabledRowPolicies & enabled)
std::vector<RowPolicyPtr> policies;
};
std::unordered_map<MixedFiltersKey, MixerWithNames, Hash> mixers;
std::unordered_map<MixedFiltersKey, MixerWithNames, Hash> database_mixers;
/// populate database_mixers using database-level policies
/// to aggregate (mix) rules per database
for (const auto & [policy_id, info] : all_policies)
{
if (info.isForDatabase())
{
const auto & policy = *info.policy;
bool match = info.roles->match(enabled.params.user_id, enabled.params.enabled_roles);
MixedFiltersKey key;
key.database = info.database_and_table_name->first;
key.table_name = info.database_and_table_name->second;
for (auto filter_type : collections::range(0, RowPolicyFilterType::MAX))
{
auto filter_type_i = static_cast<size_t>(filter_type);
if (info.parsed_filters[filter_type_i])
{
key.filter_type = filter_type;
auto & mixer = mixers[key];
MixedFiltersKey key{info.database_and_table_name->first,
info.database_and_table_name->second,
filter_type};
auto & mixer = database_mixers[key];
mixer.database_and_table_name = info.database_and_table_name;
if (match)
{
@ -240,9 +247,69 @@ void RowPolicyCache::mixFiltersFor(EnabledRowPolicies & enabled)
}
}
}
}
std::unordered_map<MixedFiltersKey, MixerWithNames, Hash> table_mixers;
/// populate table_mixers using database_mixers and table-level policies
for (const auto & [policy_id, info] : all_policies)
{
if (!info.isForDatabase())
{
const auto & policy = *info.policy;
bool match = info.roles->match(enabled.params.user_id, enabled.params.enabled_roles);
for (auto filter_type : collections::range(0, RowPolicyFilterType::MAX))
{
auto filter_type_i = static_cast<size_t>(filter_type);
if (info.parsed_filters[filter_type_i])
{
MixedFiltersKey key{info.database_and_table_name->first,
info.database_and_table_name->second,
filter_type};
auto table_it = table_mixers.find(key);
if (table_it == table_mixers.end())
{ /// no exact match - create new mixer
MixedFiltersKey database_key = key;
database_key.table_name = RowPolicyName::ANY_TABLE_MARK;
auto database_it = database_mixers.find(database_key);
if (database_it == database_mixers.end())
{
table_it = table_mixers.try_emplace(key).first;
}
else
{
/// table policies are based on database ones
table_it = table_mixers.insert({key, database_it->second}).first;
}
}
auto & mixer = table_it->second; /// getting table level mixer
mixer.database_and_table_name = info.database_and_table_name;
if (match)
{
mixer.mixer.add(info.parsed_filters[filter_type_i], policy.isRestrictive());
mixer.policies.push_back(info.policy);
}
}
}
}
}
auto mixed_filters = boost::make_shared<MixedFiltersMap>();
for (auto & [key, mixer] : mixers)
/// Retrieve aggregated policies from mixers
/// if a table has a policy for this particular table, we have all needed information in table_mixers
/// (policies for the database are already applied)
/// otherwise we would look for a policy for database using RowPolicy::ANY_TABLE_MARK
/// Consider restrictive policies a=1 for db.t, b=2 for db.* and c=3 for db.*
/// We are going to have two items in mixed_filters:
/// 1. a=1 AND b=2 AND c=3 for db.t (comes from table_mixers, where it had been created with the help of database_mixers)
/// 2. b=2 AND c=3 for db.* (comes directly from database_mixers)
for (auto * mixer_map_ptr : {&table_mixers, &database_mixers})
{
for (auto & [key, mixer] : *mixer_map_ptr)
{
auto mixed_filter = std::make_shared<RowPolicyFilter>();
mixed_filter->database_and_table_name = std::move(mixer.database_and_table_name);
@ -250,6 +317,7 @@ void RowPolicyCache::mixFiltersFor(EnabledRowPolicies & enabled)
mixed_filter->policies = std::move(mixer.policies);
mixed_filters->emplace(key, std::move(mixed_filter));
}
}
enabled.mixed_filters.store(mixed_filters);
}

View File

@ -29,6 +29,7 @@ private:
explicit PolicyInfo(const RowPolicyPtr & policy_) { setPolicy(policy_); }
void setPolicy(const RowPolicyPtr & policy_);
bool isForDatabase() const { return policy->isForDatabase(); }
RowPolicyPtr policy;
const RolesOrUsersSet * roles = nullptr;
std::shared_ptr<const std::pair<String, String>> database_and_table_name;

View File

@ -105,21 +105,21 @@ void SettingsConstraints::check(const Settings & current_settings, const Setting
if (SettingsProfileElements::isAllowBackupSetting(element.setting_name))
continue;
if (!element.value.isNull())
if (element.value)
{
SettingChange value(element.setting_name, element.value);
SettingChange value(element.setting_name, *element.value);
check(current_settings, value);
}
if (!element.min_value.isNull())
if (element.min_value)
{
SettingChange value(element.setting_name, element.min_value);
SettingChange value(element.setting_name, *element.min_value);
check(current_settings, value);
}
if (!element.max_value.isNull())
if (element.max_value)
{
SettingChange value(element.setting_name, element.max_value);
SettingChange value(element.setting_name, *element.max_value);
check(current_settings, value);
}

View File

@ -63,18 +63,18 @@ void SettingsProfileElement::init(const ASTSettingsProfileElement & ast, const A
max_value = ast.max_value;
writability = ast.writability;
if (!value.isNull())
value = Settings::castValueUtil(setting_name, value);
if (!min_value.isNull())
min_value = Settings::castValueUtil(setting_name, min_value);
if (!max_value.isNull())
max_value = Settings::castValueUtil(setting_name, max_value);
if (value)
value = Settings::castValueUtil(setting_name, *value);
if (min_value)
min_value = Settings::castValueUtil(setting_name, *min_value);
if (max_value)
max_value = Settings::castValueUtil(setting_name, *max_value);
}
}
bool SettingsProfileElement::isConstraint() const
{
return this->writability || !this->min_value.isNull() || !this->max_value.isNull();
return this->writability || this->min_value || this->max_value;
}
std::shared_ptr<ASTSettingsProfileElement> SettingsProfileElement::toAST() const
@ -187,8 +187,8 @@ Settings SettingsProfileElements::toSettings() const
Settings res;
for (const auto & elem : *this)
{
if (!elem.setting_name.empty() && !isAllowBackupSetting(elem.setting_name) && !elem.value.isNull())
res.set(elem.setting_name, elem.value);
if (!elem.setting_name.empty() && !isAllowBackupSetting(elem.setting_name) && elem.value)
res.set(elem.setting_name, *elem.value);
}
return res;
}
@ -200,8 +200,8 @@ SettingsChanges SettingsProfileElements::toSettingsChanges() const
{
if (!elem.setting_name.empty() && !isAllowBackupSetting(elem.setting_name))
{
if (!elem.value.isNull())
res.push_back({elem.setting_name, elem.value});
if (elem.value)
res.push_back({elem.setting_name, *elem.value});
}
}
return res;
@ -214,8 +214,8 @@ SettingsConstraints SettingsProfileElements::toSettingsConstraints(const AccessC
if (!elem.setting_name.empty() && elem.isConstraint() && !isAllowBackupSetting(elem.setting_name))
res.set(
elem.setting_name,
elem.min_value,
elem.max_value,
elem.min_value ? *elem.min_value : Field{},
elem.max_value ? *elem.max_value : Field{},
elem.writability ? *elem.writability : SettingConstraintWritability::WRITABLE);
return res;
}
@ -240,8 +240,8 @@ bool SettingsProfileElements::isBackupAllowed() const
{
for (const auto & setting : *this)
{
if (isAllowBackupSetting(setting.setting_name))
return static_cast<bool>(SettingFieldBool{setting.value});
if (isAllowBackupSetting(setting.setting_name) && setting.value)
return static_cast<bool>(SettingFieldBool{*setting.value});
}
return true;
}

View File

@ -23,9 +23,9 @@ struct SettingsProfileElement
std::optional<UUID> parent_profile;
String setting_name;
Field value;
Field min_value;
Field max_value;
std::optional<Field> value;
std::optional<Field> min_value;
std::optional<Field> max_value;
std::optional<SettingConstraintWritability> writability;
auto toTuple() const { return std::tie(parent_profile, setting_name, value, min_value, max_value, writability); }

View File

@ -43,6 +43,7 @@ struct KolmogorovSmirnov : public StatisticalSample<Float64, Float64>
Float64 now_s = 0;
UInt64 pos_x = 0;
UInt64 pos_y = 0;
UInt64 pos_tmp;
UInt64 n1 = x.size();
UInt64 n2 = y.size();
@ -65,14 +66,22 @@ struct KolmogorovSmirnov : public StatisticalSample<Float64, Float64>
now_s -= n2_d;
++pos_y;
}
max_s = std::max(max_s, now_s);
min_s = std::min(min_s, now_s);
}
else
{
now_s += n1_d;
++pos_x;
pos_tmp = pos_x + 1;
while (pos_tmp < x.size() && unlikely(fabs(x[pos_tmp] - x[pos_x]) <= tol))
pos_tmp++;
now_s += n1_d * (pos_tmp - pos_x);
pos_x = pos_tmp;
pos_tmp = pos_y + 1;
while (pos_tmp < y.size() && unlikely(fabs(y[pos_tmp] - y[pos_y]) <= tol))
pos_tmp++;
now_s -= n2_d * (pos_tmp - pos_y);
pos_y = pos_tmp;
}
max_s = std::max(max_s, now_s);
min_s = std::min(min_s, now_s);
}
now_s += n1_d * (x.size() - pos_x) - n2_d * (y.size() - pos_y);
min_s = std::min(min_s, now_s);

View File

@ -253,7 +253,6 @@ std::unique_ptr<WriteBuffer> BackupWriterS3::writeFile(const String & file_name)
fs::path(s3_uri.key) / file_name,
request_settings,
std::nullopt,
DBMS_DEFAULT_BUFFER_SIZE,
threadPoolCallbackRunner<void>(BackupsIOThreadPool::get(), "BackupWriterS3"));
}

View File

@ -613,6 +613,7 @@ if (ENABLE_TESTS)
target_link_libraries(unit_tests_dbms PRIVATE
ch_contrib::gtest_all
ch_contrib::gmock_all
clickhouse_functions
clickhouse_aggregate_functions
clickhouse_parsers

View File

@ -3,11 +3,18 @@
namespace DB
{
thread_local FiberInfo current_fiber_info;
AsyncTaskExecutor::AsyncTaskExecutor(std::unique_ptr<AsyncTask> task_) : task(std::move(task_))
{
createFiber();
}
FiberInfo AsyncTaskExecutor::getCurrentFiberInfo()
{
return current_fiber_info;
}
void AsyncTaskExecutor::resume()
{
if (routine_is_finished)
@ -31,7 +38,10 @@ void AsyncTaskExecutor::resume()
void AsyncTaskExecutor::resumeUnlocked()
{
auto parent_fiber_info = current_fiber_info;
current_fiber_info = FiberInfo{&fiber, &parent_fiber_info};
fiber = std::move(fiber).resume();
current_fiber_info = parent_fiber_info;
}
void AsyncTaskExecutor::cancel()

View File

@ -24,6 +24,11 @@ enum class AsyncEventTimeoutType
using AsyncCallback = std::function<void(int, Poco::Timespan, AsyncEventTimeoutType, const std::string &, uint32_t)>;
using ResumeCallback = std::function<void()>;
struct FiberInfo
{
const Fiber * fiber = nullptr;
const FiberInfo * parent_fiber_info = nullptr;
};
/// Base class for a task that will be executed in a fiber.
/// It has only one method - run, that takes 2 callbacks:
@ -75,6 +80,7 @@ public:
};
#endif
static FiberInfo getCurrentFiberInfo();
protected:
/// Method that is called in resume() before actual fiber resuming.
/// If it returns false, resume() will return immediately without actual fiber resuming.
@ -118,6 +124,48 @@ private:
std::unique_ptr<AsyncTask> task;
};
/// Simple implementation for fiber local variable.
template <typename T>
struct FiberLocal
{
public:
FiberLocal()
{
/// Initialize main instance for this thread. Instances for fibers will inherit it,
/// (it's needed because main instance could be changed before creating fibers
/// and changes should be visible in fibers).
data[nullptr] = T();
}
T & operator*()
{
return get();
}
T * operator->()
{
return &get();
}
private:
T & get()
{
return getInstanceForFiber(AsyncTaskExecutor::getCurrentFiberInfo());
}
T & getInstanceForFiber(FiberInfo info)
{
auto it = data.find(info.fiber);
/// If it's the first request, we need to initialize instance for the fiber
/// using instance from parent fiber or main thread that created fiber.
if (it == data.end())
it = data.insert({info.fiber, getInstanceForFiber(*info.parent_fiber_info)}).first;
return it->second;
}
std::unordered_map<const Fiber *, T> data;
};
String getSocketTimeoutExceededMessageByTimeoutType(AsyncEventTimeoutType type, Poco::Timespan timeout, const String & socket_description);
}

View File

@ -7,12 +7,16 @@
#include <Core/Settings.h>
#include <IO/Operators.h>
#include <Common/AsyncTaskExecutor.h>
namespace DB
{
namespace OpenTelemetry
{
thread_local TracingContextOnThread current_thread_trace_context;
///// This code can be executed inside several fibers in one thread,
///// we should use fiber local tracing context.
thread_local FiberLocal<TracingContextOnThread> current_fiber_trace_context;
bool Span::addAttribute(std::string_view name, UInt64 value) noexcept
{
@ -104,7 +108,7 @@ bool Span::addAttributeImpl(std::string_view name, std::string_view value) noexc
SpanHolder::SpanHolder(std::string_view _operation_name, SpanKind _kind)
{
if (!current_thread_trace_context.isTraceEnabled())
if (!current_fiber_trace_context->isTraceEnabled())
{
return;
}
@ -112,8 +116,8 @@ SpanHolder::SpanHolder(std::string_view _operation_name, SpanKind _kind)
/// Use try-catch to make sure the ctor is exception safe.
try
{
this->trace_id = current_thread_trace_context.trace_id;
this->parent_span_id = current_thread_trace_context.span_id;
this->trace_id = current_fiber_trace_context->trace_id;
this->parent_span_id = current_fiber_trace_context->span_id;
this->span_id = thread_local_rng(); // create a new id for this span
this->operation_name = _operation_name;
this->kind = _kind;
@ -132,7 +136,7 @@ SpanHolder::SpanHolder(std::string_view _operation_name, SpanKind _kind)
}
/// Set current span as parent of other spans created later on this thread.
current_thread_trace_context.span_id = this->span_id;
current_fiber_trace_context->span_id = this->span_id;
}
void SpanHolder::finish() noexcept
@ -141,12 +145,12 @@ void SpanHolder::finish() noexcept
return;
// First of all, restore old value of current span.
assert(current_thread_trace_context.span_id == span_id);
current_thread_trace_context.span_id = parent_span_id;
assert(current_fiber_trace_context->span_id == span_id);
current_fiber_trace_context->span_id = parent_span_id;
try
{
auto log = current_thread_trace_context.span_log.lock();
auto log = current_fiber_trace_context->span_log.lock();
/// The log might be disabled, check it before use
if (log)
@ -269,7 +273,7 @@ void TracingContext::serialize(WriteBuffer & buf) const
const TracingContextOnThread & CurrentContext()
{
return current_thread_trace_context;
return *current_fiber_trace_context;
}
void TracingContextOnThread::reset() noexcept
@ -291,7 +295,7 @@ TracingContextHolder::TracingContextHolder(
/// If any exception is raised during the construction, the tracing is not enabled on current thread.
try
{
if (current_thread_trace_context.isTraceEnabled())
if (current_fiber_trace_context->isTraceEnabled())
{
///
/// This is not the normal case,
@ -304,15 +308,15 @@ TracingContextHolder::TracingContextHolder(
/// So this branch ensures this class can be instantiated multiple times on one same thread safely.
///
this->is_context_owner = false;
this->root_span.trace_id = current_thread_trace_context.trace_id;
this->root_span.parent_span_id = current_thread_trace_context.span_id;
this->root_span.trace_id = current_fiber_trace_context->trace_id;
this->root_span.parent_span_id = current_fiber_trace_context->span_id;
this->root_span.span_id = thread_local_rng();
this->root_span.operation_name = _operation_name;
this->root_span.start_time_us
= std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
/// Set the root span as parent of other spans created on current thread
current_thread_trace_context.span_id = this->root_span.span_id;
current_fiber_trace_context->span_id = this->root_span.span_id;
return;
}
@ -356,10 +360,10 @@ TracingContextHolder::TracingContextHolder(
}
/// Set up trace context on current thread only when the root span is successfully initialized.
current_thread_trace_context = _parent_trace_context;
current_thread_trace_context.span_id = this->root_span.span_id;
current_thread_trace_context.trace_flags = TRACE_FLAG_SAMPLED;
current_thread_trace_context.span_log = _span_log;
*current_fiber_trace_context = _parent_trace_context;
current_fiber_trace_context->span_id = this->root_span.span_id;
current_fiber_trace_context->trace_flags = TRACE_FLAG_SAMPLED;
current_fiber_trace_context->span_log = _span_log;
}
TracingContextHolder::~TracingContextHolder()
@ -371,7 +375,7 @@ TracingContextHolder::~TracingContextHolder()
try
{
auto shared_span_log = current_thread_trace_context.span_log.lock();
auto shared_span_log = current_fiber_trace_context->span_log.lock();
if (shared_span_log)
{
try
@ -402,11 +406,11 @@ TracingContextHolder::~TracingContextHolder()
if (this->is_context_owner)
{
/// Clear the context on current thread
current_thread_trace_context.reset();
current_fiber_trace_context->reset();
}
else
{
current_thread_trace_context.span_id = this->root_span.parent_span_id;
current_fiber_trace_context->span_id = this->root_span.parent_span_id;
}
}

View File

@ -339,37 +339,37 @@ void KeeperStorage::UncommittedState::applyDelta(const Delta & delta)
nodes.emplace(delta.path, UncommittedNode{.node = nullptr});
}
std::visit(
[&]<typename DeltaType>(const DeltaType & operation)
{
auto & [node, acls, last_applied_zxid] = nodes.at(delta.path);
std::visit(
[&, &my_node = node, &my_acls = acls, &my_last_applied_zxid = last_applied_zxid]<typename DeltaType>(const DeltaType & operation)
{
if constexpr (std::same_as<DeltaType, CreateNodeDelta>)
{
assert(!my_node);
my_node = std::make_shared<Node>();
my_node->stat = operation.stat;
my_node->setData(operation.data);
my_acls = operation.acls;
my_last_applied_zxid = delta.zxid;
assert(!node);
node = std::make_shared<Node>();
node->stat = operation.stat;
node->setData(operation.data);
acls = operation.acls;
last_applied_zxid = delta.zxid;
}
else if constexpr (std::same_as<DeltaType, RemoveNodeDelta>)
{
assert(my_node);
my_node = nullptr;
my_last_applied_zxid = delta.zxid;
assert(node);
node = nullptr;
last_applied_zxid = delta.zxid;
}
else if constexpr (std::same_as<DeltaType, UpdateNodeDelta>)
{
assert(my_node);
my_node->invalidateDigestCache();
assert(node);
node->invalidateDigestCache();
operation.update_fn(*node);
my_last_applied_zxid = delta.zxid;
last_applied_zxid = delta.zxid;
}
else if constexpr (std::same_as<DeltaType, SetACLDelta>)
{
my_acls = operation.acls;
my_last_applied_zxid = delta.zxid;
acls = operation.acls;
last_applied_zxid = delta.zxid;
}
},
delta.operation);

View File

@ -544,6 +544,7 @@ try
auto tmp_file = std::make_unique<TemporaryFileOnDisk>(disk_ptr);
auto buf = std::make_unique<WriteBufferFromTemporaryFile>(std::move(tmp_file));
buf->write(data.data, data.PAGE_SIZE_IN_BYTES);
buf->finalize();
buf->sync();
}
return true;

View File

@ -118,12 +118,7 @@ std::future<IAsynchronousReader::Result> AsynchronousReadIndirectBufferFromRemot
request.size = size;
request.offset = file_offset_of_buffer_end;
request.priority = base_priority + priority;
if (bytes_to_ignore)
{
request.ignore = bytes_to_ignore;
bytes_to_ignore = 0;
}
return reader.submit(request);
}
@ -165,8 +160,7 @@ void AsynchronousReadIndirectBufferFromRemoteFS::setReadUntilPosition(size_t pos
void AsynchronousReadIndirectBufferFromRemoteFS::setReadUntilEnd()
{
read_until_position = impl->getFileSize();
impl->setReadUntilPosition(*read_until_position);
setReadUntilPosition(impl->getFileSize());
}
@ -228,12 +222,13 @@ bool AsynchronousReadIndirectBufferFromRemoteFS::nextImpl()
chassert(memory.size() == read_settings.prefetch_buffer_size || memory.size() == read_settings.remote_fs_buffer_size);
std::tie(size, offset) = impl->readInto(memory.data(), memory.size(), file_offset_of_buffer_end, bytes_to_ignore);
bytes_to_ignore = 0;
ProfileEvents::increment(ProfileEvents::RemoteFSUnprefetchedReads);
ProfileEvents::increment(ProfileEvents::RemoteFSUnprefetchedBytes, size);
}
bytes_to_ignore = 0;
chassert(size >= offset);
size_t bytes_read = size - offset;
@ -269,7 +264,7 @@ off_t AsynchronousReadIndirectBufferFromRemoteFS::seek(off_t offset, int whence)
}
else if (whence == SEEK_CUR)
{
new_pos = file_offset_of_buffer_end - (working_buffer.end() - pos) + offset;
new_pos = static_cast<size_t>(getPosition()) + offset;
}
else
{
@ -277,13 +272,15 @@ off_t AsynchronousReadIndirectBufferFromRemoteFS::seek(off_t offset, int whence)
}
/// Position is unchanged.
if (new_pos + (working_buffer.end() - pos) == file_offset_of_buffer_end)
if (new_pos == static_cast<size_t>(getPosition()))
return new_pos;
bool read_from_prefetch = false;
while (true)
{
if (file_offset_of_buffer_end - working_buffer.size() <= new_pos && new_pos <= file_offset_of_buffer_end)
/// The first condition implies bytes_to_ignore = 0.
if (!working_buffer.empty() && file_offset_of_buffer_end - working_buffer.size() <= new_pos &&
new_pos <= file_offset_of_buffer_end)
{
/// Position is still inside the buffer.
/// Probably it is at the end of the buffer - then we will load data on the following 'next' call.
@ -320,6 +317,7 @@ off_t AsynchronousReadIndirectBufferFromRemoteFS::seek(off_t offset, int whence)
/// First reset the buffer so the next read will fetch new data to the buffer.
resetWorkingBuffer();
bytes_to_ignore = 0;
if (read_until_position && new_pos > *read_until_position)
{
@ -356,6 +354,12 @@ off_t AsynchronousReadIndirectBufferFromRemoteFS::seek(off_t offset, int whence)
}
off_t AsynchronousReadIndirectBufferFromRemoteFS::getPosition()
{
return file_offset_of_buffer_end - available() + bytes_to_ignore;
}
void AsynchronousReadIndirectBufferFromRemoteFS::finalize()
{
resetPrefetch(FilesystemPrefetchState::UNNEEDED);

View File

@ -42,7 +42,7 @@ public:
off_t seek(off_t offset_, int whence) override;
off_t getPosition() override { return file_offset_of_buffer_end - available(); }
off_t getPosition() override;
String getFileName() const override;
@ -89,6 +89,8 @@ private:
std::string current_reader_id;
/// If nonzero then working_buffer is empty.
/// If a prefetch is in flight, the prefetch task has been instructed to ignore this many bytes.
size_t bytes_to_ignore = 0;
std::optional<size_t> read_until_position;

View File

@ -211,10 +211,16 @@ void CachedOnDiskWriteBufferFromFile::nextImpl()
{
size_t size = offset();
/// Write data to cache.
cacheData(working_buffer.begin(), size, throw_on_error_from_cache);
current_download_offset += size;
try
{
SwapHelper swap(*this, *impl);
/// Write data to the underlying buffer.
/// Actually here WriteBufferFromFileDecorator::nextImpl has to be called, but it is pivate method.
/// In particular WriteBufferFromFileDecorator introduces logic with swaps in order to achieve delegation.
impl->next();
}
catch (...)
@ -225,10 +231,6 @@ void CachedOnDiskWriteBufferFromFile::nextImpl()
throw;
}
/// Write data to cache.
cacheData(working_buffer.begin(), size, throw_on_error_from_cache);
current_download_offset += size;
}
void CachedOnDiskWriteBufferFromFile::cacheData(char * data, size_t size, bool throw_on_error)
@ -292,8 +294,7 @@ void CachedOnDiskWriteBufferFromFile::finalizeImpl()
{
try
{
SwapHelper swap(*this, *impl);
impl->finalize();
WriteBufferFromFileDecorator::finalizeImpl();
}
catch (...)
{

View File

@ -161,7 +161,7 @@ std::unique_ptr<WriteBufferFromFileBase> S3ObjectStorage::writeObject( /// NOLIN
WriteMode mode, // S3 doesn't support append, only rewrite
std::optional<ObjectAttributes> attributes,
FinalizeCallback && finalize_callback,
size_t buf_size,
size_t buf_size [[maybe_unused]],
const WriteSettings & write_settings)
{
WriteSettings disk_write_settings = IObjectStorage::patchSettings(write_settings);
@ -180,7 +180,6 @@ std::unique_ptr<WriteBufferFromFileBase> S3ObjectStorage::writeObject( /// NOLIN
object.remote_path,
settings_ptr->request_settings,
attributes,
buf_size,
std::move(scheduler),
disk_write_settings);

View File

@ -0,0 +1,234 @@
#include <IO/Resource/DynamicResourceManager.h>
#include <IO/SchedulerNodeFactory.h>
#include <IO/ResourceManagerFactory.h>
#include <IO/ISchedulerQueue.h>
#include <Common/Exception.h>
#include <Common/StringUtils/StringUtils.h>
#include <map>
#include <tuple>
namespace DB
{
namespace ErrorCodes
{
extern const int RESOURCE_ACCESS_DENIED;
extern const int RESOURCE_NOT_FOUND;
extern const int INVALID_SCHEDULER_NODE;
}
DynamicResourceManager::State::State(EventQueue * event_queue, const Poco::Util::AbstractConfiguration & config)
: classifiers(config)
{
Poco::Util::AbstractConfiguration::Keys keys;
const String config_prefix = "resources";
config.keys(config_prefix, keys);
// Create resource for every element under <resources> tag
for (const auto & key : keys)
{
resources.emplace(key, std::make_shared<Resource>(key, event_queue, config, config_prefix + "." + key));
}
}
DynamicResourceManager::State::Resource::Resource(
const String & name,
EventQueue * event_queue,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix)
{
Poco::Util::AbstractConfiguration::Keys keys;
config.keys(config_prefix, keys);
// Sort nodes by path to create parents before children
std::map<String, String> path2key;
for (const auto & key : keys)
{
if (!startsWith(key, "node"))
continue;
String path = config.getString(config_prefix + "." + key + "[@path]", "");
if (path.empty())
throw Exception(ErrorCodes::INVALID_SCHEDULER_NODE, "Attribute 'path' must be specified in all nodes for resource '{}'", name);
if (path[0] != '/')
throw Exception(ErrorCodes::INVALID_SCHEDULER_NODE, "Path must start with '/' for resource '{}'", name);
if (auto [_, inserted] = path2key.emplace(path, key); !inserted)
throw Exception(ErrorCodes::INVALID_SCHEDULER_NODE, "Duplicate path '{}' for resource '{}'", path, name);
}
// Create nodes
bool has_root = false;
for (auto [path, key] : path2key)
{
// Validate path
size_t slash = path.rfind('/');
if (slash == String::npos)
throw Exception(ErrorCodes::INVALID_SCHEDULER_NODE, "Invalid scheduler node path '{}' for resource '{}'", path, name);
// Create node
String basename = path.substr(slash + 1); // root name is empty string
auto [iter, _] = nodes.emplace(path, Node(basename, event_queue, config, config_prefix + "." + key));
if (path == "/")
{
has_root = true;
continue;
}
// Attach created node to parent (if not root)
// NOTE: resource root is attached to the scheduler using event queue for thread-safety
String parent_path = path.substr(0, slash);
if (parent_path.empty())
parent_path = "/";
if (auto parent = nodes.find(parent_path); parent != nodes.end())
parent->second.ptr->attachChild(iter->second.ptr);
else
throw Exception(ErrorCodes::INVALID_SCHEDULER_NODE, "Parent node doesn't exist for path '{}' for resource '{}'", path, name);
}
if (!has_root)
throw Exception(ErrorCodes::INVALID_SCHEDULER_NODE, "undefined root node path '/' for resource '{}'", name);
}
DynamicResourceManager::State::Resource::~Resource()
{
// NOTE: we should rely on `attached_to` and cannot use `parent`,
// NOTE: because `parent` can be `nullptr` in case attachment is still in event queue
if (attached_to != nullptr)
{
ISchedulerNode * root = nodes.find("/")->second.ptr.get();
attached_to->event_queue->enqueue([scheduler = attached_to, root]
{
scheduler->removeChild(root);
});
}
}
DynamicResourceManager::State::Node::Node(const String & name, EventQueue * event_queue, const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix)
: type(config.getString(config_prefix + ".type", "fifo"))
, ptr(SchedulerNodeFactory::instance().get(type, event_queue, config, config_prefix))
{
ptr->basename = name;
}
bool DynamicResourceManager::State::Resource::equals(const DynamicResourceManager::State::Resource & o) const
{
if (nodes.size() != o.nodes.size())
return false;
for (const auto & [path, o_node] : o.nodes)
{
auto iter = nodes.find(path);
if (iter == nodes.end())
return false;
if (!iter->second.equals(o_node))
return false;
}
return true;
}
bool DynamicResourceManager::State::Node::equals(const DynamicResourceManager::State::Node & o) const
{
if (type != o.type)
return false;
return ptr->equals(o.ptr.get());
}
DynamicResourceManager::Classifier::Classifier(const DynamicResourceManager::StatePtr & state_, const String & classifier_name)
: state(state_)
{
// State is immutable, but nodes are mutable and thread-safe
// So it's safe to obtain node pointers w/o lock
for (auto [resource_name, path] : state->classifiers.get(classifier_name))
{
if (auto resource_iter = state->resources.find(resource_name); resource_iter != state->resources.end())
{
const auto & resource = resource_iter->second;
if (auto node_iter = resource->nodes.find(path); node_iter != resource->nodes.end())
{
if (auto * queue = dynamic_cast<ISchedulerQueue *>(node_iter->second.ptr.get()))
resources.emplace(resource_name, ResourceLink{.queue = queue});
else
throw Exception(ErrorCodes::RESOURCE_NOT_FOUND, "Unable to access non-queue node at path '{}' for resource '{}'", path, resource_name);
}
else
throw Exception(ErrorCodes::RESOURCE_NOT_FOUND, "Path '{}' for resource '{}' does not exist", path, resource_name);
}
else
resources.emplace(resource_name, ResourceLink{}); // resource not configured yet - use unlimited resource
}
}
ResourceLink DynamicResourceManager::Classifier::get(const String & resource_name)
{
if (auto iter = resources.find(resource_name); iter != resources.end())
return iter->second;
else
throw Exception(ErrorCodes::RESOURCE_ACCESS_DENIED, "Access denied to resource '{}'", resource_name);
}
DynamicResourceManager::DynamicResourceManager()
: state(new State())
{
scheduler.start();
}
void DynamicResourceManager::updateConfiguration(const Poco::Util::AbstractConfiguration & config)
{
StatePtr new_state = std::make_shared<State>(scheduler.event_queue, config);
std::lock_guard lock{mutex};
// Resource update leads to loss of runtime data of nodes and may lead to temporary violation of constraints (e.g. limits)
// Try to minimise this by reusing "equal" resources (initialized with the same configuration).
for (auto & [name, new_resource] : new_state->resources)
{
if (auto iter = state->resources.find(name); iter != state->resources.end()) // Resource update
{
State::ResourcePtr old_resource = iter->second;
if (old_resource->equals(*new_resource))
new_resource = old_resource; // Rewrite with older version to avoid loss of runtime data
}
}
// Commit new state
// NOTE: dtor will detach from scheduler old resources that are not in use currently
state = new_state;
// Attach new and updated resources to the scheduler
for (auto & [name, resource] : new_state->resources)
{
const SchedulerNodePtr & root = resource->nodes.find("/")->second.ptr;
if (root->parent == nullptr)
{
resource->attached_to = &scheduler;
scheduler.event_queue->enqueue([this, root]
{
scheduler.attachChild(root);
});
}
}
// NOTE: after mutex unlock `state` became available for Classifier(s) and must be immutable
}
ClassifierPtr DynamicResourceManager::acquire(const String & classifier_name)
{
// Acquire a reference to the current state
StatePtr state_;
{
std::lock_guard lock{mutex};
state_ = state;
}
return std::make_shared<Classifier>(state_, classifier_name);
}
void registerDynamicResourceManager(ResourceManagerFactory & factory)
{
factory.registerMethod<DynamicResourceManager>("dynamic");
}
}

View File

@ -0,0 +1,93 @@
#pragma once
#include <IO/IResourceManager.h>
#include <IO/SchedulerRoot.h>
#include <IO/Resource/ClassifiersConfig.h>
#include <mutex>
namespace DB
{
/*
* Implementation of `IResourceManager` supporting arbitrary dynamic hierarchy of scheduler nodes.
* All resources are controlled by single root `SchedulerRoot`.
*
* State of manager is set of resources attached to the scheduler. States are referenced by classifiers.
* Classifiers are used (1) to access resources and (2) to keep shared ownership of resources with pending
* resource requests. This allows `ResourceRequest` and `ResourceLink` to hold raw pointers as long as
* `ClassifierPtr` is acquired and held.
*
* Manager can update configuration after initialization. During update, new version of resources are also
* attached to scheduler, so multiple version can coexist for a short perid. This will violate constraints
* (e.g. in-fly-limit), because different version have independent nodes to impose constraints, the same
* violation will apply to fairness. Old version exists as long as there is at least one classifier
* instance referencing it. Classifiers are typically attached to queries and will be destructed with them.
*/
class DynamicResourceManager : public IResourceManager
{
public:
DynamicResourceManager();
void updateConfiguration(const Poco::Util::AbstractConfiguration & config) override;
ClassifierPtr acquire(const String & classifier_name) override;
private:
/// Holds everything required to work with one specific configuration
struct State
{
struct Node
{
String type;
SchedulerNodePtr ptr;
Node(
const String & name,
EventQueue * event_queue,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix);
bool equals(const Node & o) const;
};
struct Resource
{
std::unordered_map<String, Node> nodes; // by path
SchedulerRoot * attached_to = nullptr;
Resource(
const String & name,
EventQueue * event_queue,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix);
~Resource(); // unregisters resource from scheduler
bool equals(const Resource & o) const;
};
using ResourcePtr = std::shared_ptr<Resource>;
std::unordered_map<String, ResourcePtr> resources; // by name
ClassifiersConfig classifiers;
State() = default;
explicit State(EventQueue * event_queue, const Poco::Util::AbstractConfiguration & config);
};
using StatePtr = std::shared_ptr<State>;
/// Created per query, holds State used by that query
class Classifier : public IClassifier
{
public:
Classifier(const StatePtr & state_, const String & classifier_name);
ResourceLink get(const String & resource_name) override;
private:
std::unordered_map<String, ResourceLink> resources; // accessible resources by names
StatePtr state; // hold state to avoid ResourceLink invalidation due to resource deregistration from SchedulerRoot
};
private:
SchedulerRoot scheduler;
std::mutex mutex;
StatePtr state;
};
}

View File

@ -0,0 +1,13 @@
#include <IO/Resource/FairPolicy.h>
#include <IO/SchedulerNodeFactory.h>
namespace DB
{
void registerFairPolicy(SchedulerNodeFactory & factory)
{
factory.registerMethod<FairPolicy>("fair");
}
}

View File

@ -0,0 +1,232 @@
#pragma once
#include <IO/ISchedulerQueue.h>
#include <IO/SchedulerRoot.h>
#include <Common/Stopwatch.h>
#include <algorithm>
#include <unordered_map>
#include <vector>
namespace DB
{
namespace ErrorCodes
{
extern const int INVALID_SCHEDULER_NODE;
}
/*
* Scheduler node that implements weight-based fair scheduling policy.
* Based on Start-time Fair Queueing (SFQ) algorithm.
*
* Algorithm description.
* Virtual runtime (total consumed cost divided by child weight) is tracked for every child.
* Active child with minimum vruntime is selected to be dequeued next. On activation, initial vruntime
* of a child is set to vruntime of "start" of the last request. This guarantees immediate processing
* of at least single request of newly activated children and thus best isolation and scheduling latency.
*/
class FairPolicy : public ISchedulerNode
{
/// Scheduling state of a child
struct Item
{
ISchedulerNode * child = nullptr;
double vruntime = 0; /// total consumed cost divided by child weight
/// For min-heap by vruntime
bool operator<(const Item & rhs) const noexcept
{
return vruntime > rhs.vruntime;
}
};
public:
explicit FairPolicy(EventQueue * event_queue_, const Poco::Util::AbstractConfiguration & config = emptyConfig(), const String & config_prefix = {})
: ISchedulerNode(event_queue_, config, config_prefix)
{}
bool equals(ISchedulerNode * other) override
{
if (auto * o = dynamic_cast<FairPolicy *>(other))
return true;
return false;
}
void attachChild(const SchedulerNodePtr & child) override
{
// Take ownership
if (auto [it, inserted] = children.emplace(child->basename, child); !inserted)
throw Exception(
ErrorCodes::INVALID_SCHEDULER_NODE,
"Can't add another child with the same path: {}",
it->second->getPath());
// Attach
child->setParent(this);
// At first attach as inactive child.
// Inactive attached child must have `info.parent.idx` equal it's index inside `items` array.
// This is needed to avoid later scanning through inactive `items` in O(N). Important optimization.
// NOTE: vruntime must be equal to `system_vruntime` for fairness.
child->info.parent.idx = items.size();
items.emplace_back(Item{child.get(), system_vruntime});
// Activate child if it is not empty
if (child->isActive())
activateChildImpl(items.size() - 1);
}
void removeChild(ISchedulerNode * child) override
{
if (auto iter = children.find(child->basename); iter != children.end())
{
SchedulerNodePtr removed = iter->second;
// Deactivate: detach is not very common operation, so we can afford O(N) here
size_t child_idx = 0;
[[ maybe_unused ]] bool found = false;
for (; child_idx != items.size(); child_idx++)
{
if (items[child_idx].child == removed.get())
{
found = true;
break;
}
}
assert(found);
if (child_idx < heap_size) // Detach of active child requires deactivation at first
{
heap_size--;
std::swap(items[child_idx], items[heap_size]);
// Element was removed from inside of heap -- heap must be rebuilt
std::make_heap(items.begin(), items.begin() + heap_size);
child_idx = heap_size;
}
// Now detach inactive child
if (child_idx != items.size() - 1)
{
std::swap(items[child_idx], items.back());
items[child_idx].child->info.parent.idx = child_idx;
}
items.pop_back();
// Detach
removed->setParent(nullptr);
// Get rid of ownership
children.erase(iter);
}
}
ISchedulerNode * getChild(const String & child_name) override
{
if (auto iter = children.find(child_name); iter != children.end())
return iter->second.get();
else
return nullptr;
}
std::pair<ResourceRequest *, bool> dequeueRequest() override
{
if (heap_size == 0)
return {nullptr, false};
// Recursively pull request from child
auto [request, child_active] = items.front().child->dequeueRequest();
assert(request != nullptr);
std::pop_heap(items.begin(), items.begin() + heap_size);
Item & current = items[heap_size - 1];
// SFQ fairness invariant: system vruntime equals last served request start-time
assert(current.vruntime >= system_vruntime);
system_vruntime = current.vruntime;
// By definition vruntime is amount of consumed resource (cost) divided by weight
current.vruntime += double(request->cost) / current.child->info.weight;
max_vruntime = std::max(max_vruntime, current.vruntime);
if (child_active) // Put active child back in heap after vruntime update
{
std::push_heap(items.begin(), items.begin() + heap_size);
}
else // Deactivate child if it is empty, but remember it's vruntime for latter activations
{
heap_size--;
// Store index of this inactive child in `parent.idx`
// This enables O(1) search of inactive children instead of O(n)
current.child->info.parent.idx = heap_size;
}
// Reset any difference between children on busy period end
if (heap_size == 0)
{
// Reset vtime to zero to avoid floating-point error accumulation,
// but do not reset too often, because it's O(N)
UInt64 ns = clock_gettime_ns();
if (last_reset_ns + 1000000000 < ns)
{
last_reset_ns = ns;
for (Item & item : items)
item.vruntime = 0;
max_vruntime = 0;
}
system_vruntime = max_vruntime;
}
return {request, heap_size > 0};
}
bool isActive() override
{
return heap_size > 0;
}
void activateChild(ISchedulerNode * child) override
{
// Find this child; this is O(1), thanks to inactive index we hold in `parent.idx`
activateChildImpl(child->info.parent.idx);
}
private:
void activateChildImpl(size_t inactive_idx)
{
bool activate_parent = heap_size == 0;
if (heap_size != inactive_idx)
{
std::swap(items[heap_size], items[inactive_idx]);
items[inactive_idx].child->info.parent.idx = inactive_idx;
}
// Newly activated child should have at least `system_vruntime` to keep fairness
items[heap_size].vruntime = std::max(system_vruntime, items[heap_size].vruntime);
heap_size++;
std::push_heap(items.begin(), items.begin() + heap_size);
// Recursive activation
if (activate_parent && parent)
parent->activateChild(this);
}
private:
/// Beginning of `items` vector is heap of active children: [0; `heap_size`).
/// Next go inactive children in unsorted order.
/// NOTE: we have to track vruntime of inactive children for max-min fairness.
std::vector<Item> items;
size_t heap_size = 0;
/// Last request vruntime
double system_vruntime = 0;
double max_vruntime = 0;
UInt64 last_reset_ns = 0;
/// All children with ownership
std::unordered_map<String, SchedulerNodePtr> children; // basename -> child
};
}

View File

@ -4,11 +4,13 @@
namespace DB
{
void registerDynamicResourceManager(ResourceManagerFactory &);
void registerStaticResourceManager(ResourceManagerFactory &);
void registerResourceManagers()
{
auto & factory = ResourceManagerFactory::instance();
registerDynamicResourceManager(factory);
registerStaticResourceManager(factory);
}

View File

@ -8,6 +8,7 @@ namespace DB
{
void registerPriorityPolicy(SchedulerNodeFactory &);
void registerFairPolicy(SchedulerNodeFactory &);
void registerSemaphoreConstraint(SchedulerNodeFactory &);
void registerFifoQueue(SchedulerNodeFactory &);
@ -17,6 +18,7 @@ void registerSchedulerNodes()
// ISchedulerNode
registerPriorityPolicy(factory);
registerFairPolicy(factory);
// ISchedulerConstraint
registerSemaphoreConstraint(factory);

View File

@ -0,0 +1,187 @@
#include <gtest/gtest.h>
#include <IO/Resource/tests/ResourceTest.h>
#include <IO/Resource/FairPolicy.h>
using namespace DB;
using ResourceTest = ResourceTestClass;
TEST(IOResourceFairPolicy, Factory)
{
ResourceTest t;
Poco::AutoPtr cfg = new Poco::Util::XMLConfiguration();
SchedulerNodePtr fair = SchedulerNodeFactory::instance().get("fair", /* event_queue = */ nullptr, *cfg, "");
EXPECT_TRUE(dynamic_cast<FairPolicy *>(fair.get()) != nullptr);
}
TEST(IOResourceFairPolicy, FairnessWeights)
{
ResourceTest t;
t.add<FairPolicy>("/");
t.add<FifoQueue>("/A", "<weight>1.0</weight>");
t.add<FifoQueue>("/B", "<weight>3.0</weight>");
t.enqueue("/A", {10, 10, 10, 10, 10, 10, 10, 10});
t.enqueue("/B", {10, 10, 10, 10, 10, 10, 10, 10});
t.dequeue(4);
t.consumed("A", 10);
t.consumed("B", 30);
t.dequeue(4);
t.consumed("A", 10);
t.consumed("B", 30);
t.dequeue();
t.consumed("A", 60);
t.consumed("B", 20);
}
TEST(IOResourceFairPolicy, Activation)
{
ResourceTest t;
t.add<FairPolicy>("/");
t.add<FifoQueue>("/A");
t.add<FifoQueue>("/B");
t.add<FifoQueue>("/C");
t.enqueue("/A", {10, 10, 10, 10, 10, 10, 10, 10});
t.enqueue("/B", {10});
t.enqueue("/C", {10, 10});
t.dequeue(3);
t.consumed("A", 10);
t.consumed("B", 10);
t.consumed("C", 10);
t.dequeue(4);
t.consumed("A", 30);
t.consumed("B", 0);
t.consumed("C", 10);
t.enqueue("/B", {10, 10});
t.dequeue(1);
t.consumed("B", 10);
t.enqueue("/C", {10, 10});
t.dequeue(1);
t.consumed("C", 10);
t.dequeue(2); // A B or B A
t.consumed("A", 10);
t.consumed("B", 10);
}
TEST(IOResourceFairPolicy, FairnessMaxMin)
{
ResourceTest t;
t.add<FairPolicy>("/");
t.add<FifoQueue>("/A");
t.add<FifoQueue>("/B");
t.enqueue("/A", {10, 10}); // make sure A is never empty
for (int i = 0; i < 10; i++)
{
t.enqueue("/A", {10, 10, 10, 10});
t.enqueue("/B", {10, 10});
t.dequeue(6);
t.consumed("A", 40);
t.consumed("B", 20);
}
t.dequeue(2);
t.consumed("A", 20);
}
TEST(IOResourceFairPolicy, HierarchicalFairness)
{
ResourceTest t;
t.add<FairPolicy>("/");
t.add<FairPolicy>("/X");
t.add<FairPolicy>("/Y");
t.add<FifoQueue>("/X/A");
t.add<FifoQueue>("/X/B");
t.add<FifoQueue>("/Y/C");
t.add<FifoQueue>("/Y/D");
t.enqueue("/X/A", {10, 10, 10, 10, 10, 10, 10, 10});
t.enqueue("/X/B", {10, 10, 10, 10, 10, 10, 10, 10});
t.enqueue("/Y/C", {10, 10, 10, 10, 10, 10, 10, 10});
t.enqueue("/Y/D", {10, 10, 10, 10, 10, 10, 10, 10});
for (int i = 0; i < 4; i++)
{
t.dequeue(8);
t.consumed("A", 20);
t.consumed("B", 20);
t.consumed("C", 20);
t.consumed("D", 20);
}
t.enqueue("/X/A", {10, 10, 10, 10, 10, 10, 10, 10});
t.enqueue("/X/A", {10, 10, 10, 10, 10, 10, 10, 10});
t.enqueue("/Y/C", {10, 10, 10, 10, 10, 10, 10, 10});
t.enqueue("/Y/D", {10, 10, 10, 10, 10, 10, 10, 10});
for (int i = 0; i < 4; i++)
{
t.dequeue(8);
t.consumed("A", 40);
t.consumed("C", 20);
t.consumed("D", 20);
}
t.enqueue("/X/B", {10, 10, 10, 10, 10, 10, 10, 10});
t.enqueue("/X/B", {10, 10, 10, 10, 10, 10, 10, 10});
t.enqueue("/Y/C", {10, 10, 10, 10, 10, 10, 10, 10});
t.enqueue("/Y/D", {10, 10, 10, 10, 10, 10, 10, 10});
for (int i = 0; i < 4; i++)
{
t.dequeue(8);
t.consumed("B", 40);
t.consumed("C", 20);
t.consumed("D", 20);
}
t.enqueue("/X/A", {10, 10, 10, 10, 10, 10, 10, 10});
t.enqueue("/X/B", {10, 10, 10, 10, 10, 10, 10, 10});
t.enqueue("/Y/C", {10, 10, 10, 10, 10, 10, 10, 10});
t.enqueue("/Y/C", {10, 10, 10, 10, 10, 10, 10, 10});
for (int i = 0; i < 4; i++)
{
t.dequeue(8);
t.consumed("A", 20);
t.consumed("B", 20);
t.consumed("C", 40);
}
t.enqueue("/X/A", {10, 10, 10, 10, 10, 10, 10, 10});
t.enqueue("/X/B", {10, 10, 10, 10, 10, 10, 10, 10});
t.enqueue("/Y/D", {10, 10, 10, 10, 10, 10, 10, 10});
t.enqueue("/Y/D", {10, 10, 10, 10, 10, 10, 10, 10});
for (int i = 0; i < 4; i++)
{
t.dequeue(8);
t.consumed("A", 20);
t.consumed("B", 20);
t.consumed("D", 40);
}
t.enqueue("/X/A", {10, 10, 10, 10, 10, 10, 10, 10});
t.enqueue("/X/A", {10, 10, 10, 10, 10, 10, 10, 10});
t.enqueue("/Y/D", {10, 10, 10, 10, 10, 10, 10, 10});
t.enqueue("/Y/D", {10, 10, 10, 10, 10, 10, 10, 10});
for (int i = 0; i < 4; i++)
{
t.dequeue(8);
t.consumed("A", 40);
t.consumed("D", 40);
}
}

View File

@ -0,0 +1,116 @@
#include <gtest/gtest.h>
#include <IO/Resource/tests/ResourceTest.h>
#include <IO/Resource/DynamicResourceManager.h>
#include <Poco/Util/XMLConfiguration.h>
using namespace DB;
using ResourceTest = ResourceTestManager<DynamicResourceManager>;
using TestGuard = ResourceTest::Guard;
TEST(IOResourceDynamicResourceManager, Smoke)
{
ResourceTest t;
t.update(R"CONFIG(
<clickhouse>
<resources>
<res1>
<node path="/"><type>inflight_limit</type><max_requests>10</max_requests></node>
<node path="/fair"><type>fair</type></node>
<node path="/fair/A"><type>fifo</type></node>
<node path="/fair/B"><type>fifo</type><weight>3</weight></node>
</res1>
</resources>
<classifiers>
<A><res1>/fair/A</res1></A>
<B><res1>/fair/B</res1></B>
</classifiers>
</clickhouse>
)CONFIG");
ClassifierPtr cA = t.manager->acquire("A");
ClassifierPtr cB = t.manager->acquire("B");
for (int i = 0; i < 10; i++)
{
ResourceGuard gA(cA->get("res1"), ResourceGuard::PostponeLocking);
gA.lock();
gA.setFailure();
gA.unlock();
ResourceGuard gB(cB->get("res1"));
}
}
TEST(IOResourceDynamicResourceManager, Fairness)
{
constexpr size_t T = 3; // threads per queue
int N = 100; // requests per thread
ResourceTest t(2 * T + 1);
t.update(R"CONFIG(
<clickhouse>
<resources>
<res1>
<node path="/"> <type>inflight_limit</type><max_requests>1</max_requests></node>
<node path="/fair"> <type>fair</type></node>
<node path="/fair/A"> <type>fifo</type></node>
<node path="/fair/B"> <type>fifo</type></node>
<node path="/fair/leader"><type>fifo</type></node>
</res1>
</resources>
<classifiers>
<A><res1>/fair/A</res1></A>
<B><res1>/fair/B</res1></B>
<leader><res1>/fair/leader</res1></leader>
</classifiers>
</clickhouse>
)CONFIG");
// Total cost for A and B cannot differ for more than 1 (every request has cost equal to 1).
// Requests from A use `value = 1` and from B `value = -1` is used.
std::atomic<Int64> unfairness = 0;
auto fairness_diff = [&] (Int64 value)
{
Int64 cur_unfairness = unfairness.fetch_add(value, std::memory_order_relaxed) + value;
EXPECT_NEAR(cur_unfairness, 0, 1);
};
for (int thr = 0; thr < T; thr++)
{
t.threads.emplace_back([&]
{
ClassifierPtr c = t.manager->acquire("A");
ResourceLink link = c->get("res1");
t.startBusyPeriod(link, 1, N);
for (int req = 0; req < N; req++)
{
TestGuard g(t, link, 1);
fairness_diff(1);
}
});
}
for (int thr = 0; thr < T; thr++)
{
t.threads.emplace_back([&]
{
ClassifierPtr c = t.manager->acquire("B");
ResourceLink link = c->get("res1");
t.startBusyPeriod(link, 1, N);
for (int req = 0; req < N; req++)
{
TestGuard g(t, link, 1);
fairness_diff(-1);
}
});
}
ClassifierPtr c = t.manager->acquire("leader");
ResourceLink link = c->get("res1");
t.blockResource(link);
}

View File

@ -255,7 +255,7 @@ Model::HeadObjectOutcome Client::HeadObject(const HeadObjectRequest & request) c
if (auto uri = getURIForBucket(bucket); uri.has_value())
request.overrideURI(std::move(*uri));
auto result = Aws::S3::S3Client::HeadObject(request);
auto result = HeadObject(static_cast<const Model::HeadObjectRequest&>(request));
if (result.IsSuccess())
return result;
@ -312,70 +312,75 @@ Model::HeadObjectOutcome Client::HeadObject(const HeadObjectRequest & request) c
request.overrideURI(std::move(*bucket_uri));
return Aws::S3::S3Client::HeadObject(request);
/// The next call is NOT a recurcive call
/// This is a virtuall call Aws::S3::S3Client::HeadObject(const Model::HeadObjectRequest&)
return HeadObject(static_cast<const Model::HeadObjectRequest&>(request));
}
/// For each request, we wrap the request functions from Aws::S3::Client with doRequest
/// doRequest calls virtuall function from Aws::S3::Client while DB::S3::Client has not virtual calls for each request type
Model::ListObjectsV2Outcome Client::ListObjectsV2(const ListObjectsV2Request & request) const
{
return doRequest(request, [this](const Model::ListObjectsV2Request & req) { return Aws::S3::S3Client::ListObjectsV2(req); });
return doRequest(request, [this](const Model::ListObjectsV2Request & req) { return ListObjectsV2(req); });
}
Model::ListObjectsOutcome Client::ListObjects(const ListObjectsRequest & request) const
{
return doRequest(request, [this](const Model::ListObjectsRequest & req) { return Aws::S3::S3Client::ListObjects(req); });
return doRequest(request, [this](const Model::ListObjectsRequest & req) { return ListObjects(req); });
}
Model::GetObjectOutcome Client::GetObject(const GetObjectRequest & request) const
{
return doRequest(request, [this](const Model::GetObjectRequest & req) { return Aws::S3::S3Client::GetObject(req); });
return doRequest(request, [this](const Model::GetObjectRequest & req) { return GetObject(req); });
}
Model::AbortMultipartUploadOutcome Client::AbortMultipartUpload(const AbortMultipartUploadRequest & request) const
{
return doRequest(
request, [this](const Model::AbortMultipartUploadRequest & req) { return Aws::S3::S3Client::AbortMultipartUpload(req); });
request, [this](const Model::AbortMultipartUploadRequest & req) { return AbortMultipartUpload(req); });
}
Model::CreateMultipartUploadOutcome Client::CreateMultipartUpload(const CreateMultipartUploadRequest & request) const
{
return doRequest(
request, [this](const Model::CreateMultipartUploadRequest & req) { return Aws::S3::S3Client::CreateMultipartUpload(req); });
request, [this](const Model::CreateMultipartUploadRequest & req) { return CreateMultipartUpload(req); });
}
Model::CompleteMultipartUploadOutcome Client::CompleteMultipartUpload(const CompleteMultipartUploadRequest & request) const
{
return doRequest(
request, [this](const Model::CompleteMultipartUploadRequest & req) { return Aws::S3::S3Client::CompleteMultipartUpload(req); });
request, [this](const Model::CompleteMultipartUploadRequest & req) { return CompleteMultipartUpload(req); });
}
Model::CopyObjectOutcome Client::CopyObject(const CopyObjectRequest & request) const
{
return doRequest(request, [this](const Model::CopyObjectRequest & req) { return Aws::S3::S3Client::CopyObject(req); });
return doRequest(request, [this](const Model::CopyObjectRequest & req) { return CopyObject(req); });
}
Model::PutObjectOutcome Client::PutObject(const PutObjectRequest & request) const
{
return doRequest(request, [this](const Model::PutObjectRequest & req) { return Aws::S3::S3Client::PutObject(req); });
return doRequest(request, [this](const Model::PutObjectRequest & req) { return PutObject(req); });
}
Model::UploadPartOutcome Client::UploadPart(const UploadPartRequest & request) const
{
return doRequest(request, [this](const Model::UploadPartRequest & req) { return Aws::S3::S3Client::UploadPart(req); });
return doRequest(request, [this](const Model::UploadPartRequest & req) { return UploadPart(req); });
}
Model::UploadPartCopyOutcome Client::UploadPartCopy(const UploadPartCopyRequest & request) const
{
return doRequest(request, [this](const Model::UploadPartCopyRequest & req) { return Aws::S3::S3Client::UploadPartCopy(req); });
return doRequest(request, [this](const Model::UploadPartCopyRequest & req) { return UploadPartCopy(req); });
}
Model::DeleteObjectOutcome Client::DeleteObject(const DeleteObjectRequest & request) const
{
return doRequest(request, [this](const Model::DeleteObjectRequest & req) { return Aws::S3::S3Client::DeleteObject(req); });
return doRequest(request, [this](const Model::DeleteObjectRequest & req) { return DeleteObject(req); });
}
Model::DeleteObjectsOutcome Client::DeleteObjects(const DeleteObjectsRequest & request) const
{
return doRequest(request, [this](const Model::DeleteObjectsRequest & req) { return Aws::S3::S3Client::DeleteObjects(req); });
return doRequest(request, [this](const Model::DeleteObjectsRequest & req) { return DeleteObjects(req); });
}
template <typename RequestType, typename RequestFn>

View File

@ -40,6 +40,11 @@ struct ServerSideEncryptionKMSConfig
#include <aws/core/client/AWSErrorMarshaller.h>
#include <aws/core/client/RetryStrategy.h>
namespace MockS3
{
struct Client;
}
namespace DB::S3
{
@ -195,6 +200,8 @@ public:
bool supportsMultiPartCopy() const;
private:
friend struct ::MockS3::Client;
Client(size_t max_redirects_,
ServerSideEncryptionKMSConfig sse_kms_config_,
const std::shared_ptr<Aws::Auth::AWSCredentialsProvider>& credentials_provider,

View File

@ -20,8 +20,6 @@
#include <aws/core/Aws.h>
#include <aws/s3/S3Errors.h>
namespace Aws::S3 { class Client; }
namespace DB
{

View File

@ -0,0 +1,62 @@
#include <IO/StdIStreamFromMemory.h>
namespace DB
{
StdIStreamFromMemory::MemoryBuf::MemoryBuf(char * begin_, size_t size_)
: begin(begin_)
, size(size_)
{
this->setg(begin, begin, begin + size);
}
StdIStreamFromMemory::MemoryBuf::int_type StdIStreamFromMemory::MemoryBuf::underflow()
{
if (gptr() < egptr())
return traits_type::to_int_type(*gptr());
return traits_type::eof();
}
StdIStreamFromMemory::MemoryBuf::pos_type
StdIStreamFromMemory::MemoryBuf::seekoff(off_type off, std::ios_base::seekdir way,
std::ios_base::openmode mode)
{
bool out_mode = (std::ios_base::out & mode) != 0;
if (out_mode)
return off_type(-1);
off_type ret(-1);
if (way == std::ios_base::beg)
ret = 0;
else if (way == std::ios_base::cur)
ret = gptr() - begin;
else if (way == std::ios_base::end)
ret = size;
if (ret == off_type(-1))
return ret;
ret += off;
if (!(ret >= 0 && size_t(ret) <= size))
return off_type(-1);
this->setg(begin, begin + ret, begin + size);
return pos_type(ret);
}
StdIStreamFromMemory::MemoryBuf::pos_type StdIStreamFromMemory::MemoryBuf::seekpos(pos_type sp,
std::ios_base::openmode mode)
{
return seekoff(off_type(sp), std::ios_base::beg, mode);
}
StdIStreamFromMemory::StdIStreamFromMemory(char * begin_, size_t size_)
: std::iostream(nullptr)
, mem_buf(begin_, size_)
{
init(&mem_buf);
}
}

View File

@ -0,0 +1,36 @@
#pragma once
#include <iostream>
namespace DB
{
/// StdIStreamFromMemory is used in WriteBufferFromS3 as a stream which is passed to the S3::Client
/// It provides istream interface (only reading) over the memory.
/// However S3::Client requires iostream interface it only reads from the stream
class StdIStreamFromMemory : public std::iostream
{
struct MemoryBuf: std::streambuf
{
MemoryBuf(char * begin_, size_t size_);
int_type underflow() override;
pos_type seekoff(off_type off, std::ios_base::seekdir way,
std::ios_base::openmode mode) override;
pos_type seekpos(pos_type sp,
std::ios_base::openmode mode) override;
char * begin = nullptr;
size_t size = 0;
};
MemoryBuf mem_buf;
public:
StdIStreamFromMemory(char * begin_, size_t size_);
};
}

17
src/IO/SwapHelper.cpp Normal file
View File

@ -0,0 +1,17 @@
#include <IO/SwapHelper.h>
namespace DB
{
SwapHelper::SwapHelper(BufferBase & b1_, BufferBase & b2_)
: b1(b1_), b2(b2_)
{
b1.swap(b2);
}
SwapHelper::~SwapHelper()
{
b1.swap(b2);
}
}

View File

@ -1,16 +1,19 @@
#pragma once
#include <IO/BufferBase.h>
namespace DB
{
class SwapHelper
{
public:
SwapHelper(BufferBase & b1_, BufferBase & b2_) : b1(b1_), b2(b2_) { b1.swap(b2); }
~SwapHelper() { b1.swap(b2); }
private:
class SwapHelper
{
public:
SwapHelper(BufferBase & b1_, BufferBase & b2_);
~SwapHelper();
private:
BufferBase & b1;
BufferBase & b2;
};
};
}

View File

@ -42,7 +42,8 @@ public:
{
if (!offset())
return;
bytes += offset();
auto bytes_in_buffer = offset();
try
{
@ -54,9 +55,11 @@ public:
* so that later (for example, when the stack was expanded) there was no second attempt to write data.
*/
pos = working_buffer.begin();
bytes += bytes_in_buffer;
throw;
}
bytes += bytes_in_buffer;
pos = working_buffer.begin();
}

View File

@ -1,6 +1,7 @@
#include "WriteBufferFromFileDecorator.h"
#include <IO/WriteBuffer.h>
#include <IO/SwapHelper.h>
namespace DB
{
@ -13,12 +14,18 @@ WriteBufferFromFileDecorator::WriteBufferFromFileDecorator(std::unique_ptr<Write
void WriteBufferFromFileDecorator::finalizeImpl()
{
next();
/// In case of exception in preFinalize as a part of finalize call
/// WriteBufferFromFileDecorator.finalized is set as true
/// but impl->finalized is remain false
/// That leads to situation when the destructor of impl is called with impl->finalized equal false.
if (!is_prefinalized)
WriteBufferFromFileDecorator::preFinalize();
{
SwapHelper swap(*this, *impl);
impl->finalize();
}
}
WriteBufferFromFileDecorator::~WriteBufferFromFileDecorator()
@ -31,11 +38,21 @@ WriteBufferFromFileDecorator::~WriteBufferFromFileDecorator()
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
/// It is not a mistake that swap is called here
/// Swap has been called at constructor, it should be called at destructor
/// In oreder to provide valid buffer for impl's d-tor call
swap(*impl);
}
void WriteBufferFromFileDecorator::sync()
{
next();
{
SwapHelper swap(*this, *impl);
impl->sync();
}
}
std::string WriteBufferFromFileDecorator::getFileName() const
@ -45,11 +62,22 @@ std::string WriteBufferFromFileDecorator::getFileName() const
return std::string();
}
void WriteBufferFromFileDecorator::preFinalize()
{
next();
{
SwapHelper swap(*this, *impl);
impl->preFinalize();
}
is_prefinalized = true;
}
void WriteBufferFromFileDecorator::nextImpl()
{
swap(*impl);
SwapHelper swap(*this, *impl);
impl->next();
swap(*impl);
}
}

View File

@ -17,12 +17,7 @@ public:
std::string getFileName() const override;
void preFinalize() override
{
next();
impl->preFinalize();
is_prefinalized = true;
}
void preFinalize() override;
const WriteBuffer & getImpl() const { return *impl; }

File diff suppressed because it is too large Load Diff

View File

@ -4,31 +4,19 @@
#if USE_AWS_S3
#include <memory>
#include <vector>
#include <list>
#include <base/types.h>
#include <IO/BufferWithOwnMemory.h>
#include <IO/WriteBuffer.h>
#include <IO/WriteSettings.h>
#include <IO/S3/Requests.h>
#include <Storages/StorageS3Settings.h>
#include <Interpreters/threadPoolCallbackRunner.h>
#include <aws/core/utils/memory/stl/AWSStringStream.h>
namespace Aws::S3
{
class Client;
}
#include <memory>
#include <vector>
#include <list>
namespace DB
{
class WriteBufferFromFile;
/**
* Buffer to write a data to a S3 object with specified bucket and key.
* If data size written to the buffer is less than 'max_single_part_upload_size' write is performed using singlepart upload.
@ -45,81 +33,86 @@ public:
const String & key_,
const S3Settings::RequestSettings & request_settings_,
std::optional<std::map<String, String>> object_metadata_ = std::nullopt,
size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE,
ThreadPoolCallbackRunner<void> schedule_ = {},
const WriteSettings & write_settings_ = {});
~WriteBufferFromS3() override;
void nextImpl() override;
void preFinalize() override;
public:
class IBufferAllocationPolicy
{
public:
virtual size_t getBufferNumber() const = 0;
virtual size_t getBufferSize() const = 0;
virtual void nextBuffer() = 0;
virtual ~IBufferAllocationPolicy() = 0;
};
using IBufferAllocationPolicyPtr = std::unique_ptr<IBufferAllocationPolicy>;
static IBufferAllocationPolicyPtr ChooseBufferPolicy(const S3Settings::RequestSettings::PartUploadSettings & settings_);
private:
void allocateBuffer();
void processWithStrictParts();
void processWithDynamicParts();
void fillCreateMultipartRequest(S3::CreateMultipartUploadRequest & req);
void createMultipartUpload();
void writePart();
void completeMultipartUpload();
void makeSinglepartUpload();
/// Receives response from the server after sending all data.
void finalizeImpl() override;
struct UploadPartTask;
void fillUploadRequest(S3::UploadPartRequest & req);
void processUploadRequest(UploadPartTask & task);
String getLogDetails() const;
struct PutObjectTask;
void fillPutRequest(S3::PutObjectRequest & req);
void processPutRequest(const PutObjectTask & task);
struct PartData;
void hidePartialData();
void allocateFirstBuffer();
void reallocateFirstBuffer();
void detachBuffer();
void allocateBuffer();
void setFakeBufferWhenPreFinalized();
void waitForReadyBackgroundTasks();
void waitForAllBackgroundTasks();
void waitForAllBackgroundTasksUnlocked(std::unique_lock<std::mutex> & bg_tasks_lock);
S3::UploadPartRequest getUploadRequest(size_t part_number, PartData & data);
void writePart(PartData && data);
void writeMultipartUpload();
void createMultipartUpload();
void completeMultipartUpload();
void abortMultipartUpload();
void tryToAbortMultipartUpload();
S3::PutObjectRequest getPutRequest(PartData & data);
void makeSinglepartUpload(PartData && data);
const String bucket;
const String key;
const S3Settings::RequestSettings request_settings;
const S3Settings::RequestSettings::PartUploadSettings & upload_settings;
const WriteSettings write_settings;
const std::shared_ptr<const S3::Client> client_ptr;
const std::optional<std::map<String, String>> object_metadata;
Poco::Logger * log = &Poco::Logger::get("WriteBufferFromS3");
/// Strict/static Part size, no adjustments will be done on fly.
size_t strict_upload_part_size = 0;
/// Part size will be adjusted on fly (for bigger uploads)
size_t current_upload_part_size = 0;
std::shared_ptr<Aws::StringStream> temporary_buffer; /// Buffer to accumulate data.
size_t last_part_size = 0;
size_t part_number = 0;
IBufferAllocationPolicyPtr buffer_allocation_policy;
/// Upload in S3 is made in parts.
/// We initiate upload, then upload each part and get ETag as a response, and then finalizeImpl() upload with listing all our parts.
String multipart_upload_id;
std::vector<String> TSA_GUARDED_BY(bg_tasks_mutex) part_tags;
std::deque<String> multipart_tags;
bool multipart_upload_finished = false;
/// Track that prefinalize() is called only once
bool is_prefinalized = false;
/// Following fields are for background uploads in thread pool (if specified).
/// We use std::function to avoid dependency of Interpreters
const ThreadPoolCallbackRunner<void> schedule;
/// First fully filled buffer has to be delayed
/// There are two ways after:
/// First is to call prefinalize/finalize, which leads to single part upload
/// Second is to write more data, which leads to multi part upload
std::deque<PartData> detached_part_data;
char fake_buffer_when_prefinalized[1] = {};
std::unique_ptr<PutObjectTask> put_object_task; /// Does not need protection by mutex because of the logic around is_finished field.
std::list<UploadPartTask> TSA_GUARDED_BY(bg_tasks_mutex) upload_object_tasks;
int num_added_bg_tasks TSA_GUARDED_BY(bg_tasks_mutex) = 0;
int num_finished_bg_tasks TSA_GUARDED_BY(bg_tasks_mutex) = 0;
/// offset() and count() are unstable inside nextImpl
/// For example nextImpl changes position hence offset() and count() is changed
/// This vars are dedicated to store information about sizes when offset() and count() are unstable
size_t total_size = 0;
size_t hidden_size = 0;
std::mutex bg_tasks_mutex;
std::condition_variable bg_tasks_condvar;
Poco::Logger * log = &Poco::Logger::get("WriteBufferFromS3");
WriteSettings write_settings;
class TaskTracker;
std::unique_ptr<TaskTracker> task_tracker;
};
}

View File

@ -0,0 +1,112 @@
#include "config.h"
#if USE_AWS_S3
#include <IO/WriteBufferFromS3.h>
#include <memory>
namespace
{
class FixedSizeBufferAllocationPolicy : public DB::WriteBufferFromS3::IBufferAllocationPolicy
{
const size_t buffer_size = 0;
size_t buffer_number = 0;
public:
explicit FixedSizeBufferAllocationPolicy(const DB::S3Settings::RequestSettings::PartUploadSettings & settings_)
: buffer_size(settings_.strict_upload_part_size)
{
chassert(buffer_size > 0);
}
size_t getBufferNumber() const override { return buffer_number; }
size_t getBufferSize() const override
{
chassert(buffer_number > 0);
return buffer_size;
}
void nextBuffer() override
{
++buffer_number;
}
};
class ExpBufferAllocationPolicy : public DB::WriteBufferFromS3::IBufferAllocationPolicy
{
const size_t first_size = 0;
const size_t second_size = 0;
const size_t multiply_factor = 0;
const size_t multiply_threshold = 0;
const size_t max_size = 0;
size_t current_size = 0;
size_t buffer_number = 0;
public:
explicit ExpBufferAllocationPolicy(const DB::S3Settings::RequestSettings::PartUploadSettings & settings_)
: first_size(std::max(settings_.max_single_part_upload_size, settings_.min_upload_part_size))
, second_size(settings_.min_upload_part_size)
, multiply_factor(settings_.upload_part_size_multiply_factor)
, multiply_threshold(settings_.upload_part_size_multiply_parts_count_threshold)
, max_size(settings_.max_upload_part_size)
{
chassert(first_size > 0);
chassert(second_size > 0);
chassert(multiply_factor >= 1);
chassert(multiply_threshold > 0);
chassert(max_size > 0);
}
size_t getBufferNumber() const override { return buffer_number; }
size_t getBufferSize() const override
{
chassert(buffer_number > 0);
return current_size;
}
void nextBuffer() override
{
++buffer_number;
if (1 == buffer_number)
{
current_size = first_size;
return;
}
if (2 == buffer_number)
current_size = second_size;
if (0 == ((buffer_number - 1) % multiply_threshold))
{
current_size *= multiply_factor;
current_size = std::min(current_size, max_size);
}
}
};
}
namespace DB
{
WriteBufferFromS3::IBufferAllocationPolicy::~IBufferAllocationPolicy() = default;
WriteBufferFromS3::IBufferAllocationPolicyPtr WriteBufferFromS3::ChooseBufferPolicy(const S3Settings::RequestSettings::PartUploadSettings & settings_)
{
if (settings_.strict_upload_part_size > 0)
return std::make_unique<FixedSizeBufferAllocationPolicy>(settings_);
else
return std::make_unique<ExpBufferAllocationPolicy>(settings_);
}
}
#endif

View File

@ -0,0 +1,137 @@
#include "config.h"
#if USE_AWS_S3
#include <IO/WriteBufferFromS3TaskTracker.h>
namespace DB
{
WriteBufferFromS3::TaskTracker::TaskTracker(ThreadPoolCallbackRunner<void> scheduler_)
: is_async(bool(scheduler_))
, scheduler(scheduler_ ? std::move(scheduler_) : syncRunner())
{}
WriteBufferFromS3::TaskTracker::~TaskTracker()
{
safeWaitAll();
}
ThreadPoolCallbackRunner<void> WriteBufferFromS3::TaskTracker::syncRunner()
{
return [](Callback && callback, int64_t) mutable -> std::future<void>
{
auto package = std::packaged_task<void()>(std::move(callback));
/// No exceptions are propagated, exceptions are packed to future
package();
return package.get_future();
};
}
void WriteBufferFromS3::TaskTracker::waitReady()
{
LOG_TEST(log, "waitReady, in queue {}", futures.size());
/// Exceptions are propagated
auto it = futures.begin();
while (it != futures.end())
{
chassert(it->valid());
if (it->wait_for(std::chrono::seconds(0)) != std::future_status::ready)
{
++it;
continue;
}
try
{
it->get();
} catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
throw;
}
it = futures.erase(it);
}
LOG_TEST(log, "waitReady ended, in queue {}", futures.size());
}
void WriteBufferFromS3::TaskTracker::waitAll()
{
LOG_TEST(log, "waitAll, in queue {}", futures.size());
/// Exceptions are propagated
for (auto & future : futures)
{
try
{
future.get();
} catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
throw;
}
}
futures.clear();
}
void WriteBufferFromS3::TaskTracker::safeWaitAll()
{
LOG_TEST(log, "safeWaitAll, wait in queue {}", futures.size());
/// Exceptions are not propagated
for (auto & future : futures)
{
LOG_TEST(log, "safeWaitAll, wait future");
if (future.valid())
future.wait();
}
LOG_TEST(log, "safeWaitAll, get in queue {}", futures.size());
for (auto & future : futures)
{
if (future.valid())
{
try
{
future.get();
} catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
}
futures.clear();
LOG_TEST(log, "safeWaitAll ended, get in queue {}", futures.size());
}
void WriteBufferFromS3::TaskTracker::add(Callback && func)
{
LOG_TEST(log, "add, in queue {}", futures.size());
auto future = scheduler(std::move(func), 0);
auto exit_scope = scope_guard(
[&future]()
{
future.wait();
}
);
futures.push_back(std::move(future));
exit_scope.release();
LOG_TEST(log, "add ended, in queue {}", futures.size());
}
bool WriteBufferFromS3::TaskTracker::isAsync() const
{
return is_async;
}
}
#endif

View File

@ -0,0 +1,43 @@
#pragma once
#include "config.h"
#if USE_AWS_S3
#include "WriteBufferFromS3.h"
namespace DB
{
/// That class is used only in WriteBufferFromS3 for now.
/// Therefore it declared as a part of WriteBufferFromS3.
/// TaskTracker takes a Callback which is run by scheduler in some external shared ThreadPool.
/// TaskTracker brings the methods waitReady, waitAll/safeWaitAll
/// to help with coordination of the running tasks.
class WriteBufferFromS3::TaskTracker
{
public:
using Callback = std::function<void()>;
explicit TaskTracker(ThreadPoolCallbackRunner<void> scheduler_);
~TaskTracker();
static ThreadPoolCallbackRunner<void> syncRunner();
bool isAsync() const;
void waitReady();
void waitAll();
void safeWaitAll();
void add(Callback && func);
private:
bool is_async;
ThreadPoolCallbackRunner<void> scheduler;
std::list<std::future<void>> futures;
Poco::Logger * log = &Poco::Logger::get("TaskTracker");
};
}
#endif

File diff suppressed because it is too large Load Diff

View File

@ -1220,7 +1220,7 @@ ResourceManagerPtr Context::getResourceManager() const
{
auto lock = getLock();
if (!shared->resource_manager)
shared->resource_manager = ResourceManagerFactory::instance().get(getConfigRef().getString("resource_manager", "static"));
shared->resource_manager = ResourceManagerFactory::instance().get(getConfigRef().getString("resource_manager", "dynamic"));
return shared->resource_manager;
}

View File

@ -30,6 +30,11 @@ void ASTRowPolicyName::replaceEmptyDatabase(const String & current_database)
full_name.database = current_database;
}
String ASTRowPolicyNames::tableOrAsterisk(const String & table_name) const
{
return table_name == RowPolicyName::ANY_TABLE_MARK ? "*" : backQuoteIfNeed(table_name);
}
void ASTRowPolicyNames::formatImpl(const FormatSettings & settings, FormatState &, FormatStateStacked) const
{
@ -73,7 +78,7 @@ void ASTRowPolicyNames::formatImpl(const FormatSettings & settings, FormatState
const String & table_name = full_name.table_name;
if (!database.empty())
settings.ostr << backQuoteIfNeed(database) + ".";
settings.ostr << backQuoteIfNeed(table_name);
settings.ostr << tableOrAsterisk(table_name);
}
}
else if (same_db_and_table_name)
@ -92,7 +97,7 @@ void ASTRowPolicyNames::formatImpl(const FormatSettings & settings, FormatState
settings.ostr << (settings.hilite ? hilite_keyword : "") << " ON " << (settings.hilite ? hilite_none : "");
if (!database.empty())
settings.ostr << backQuoteIfNeed(database) + ".";
settings.ostr << backQuoteIfNeed(table_name);
settings.ostr << tableOrAsterisk(table_name);
}
else
{
@ -108,7 +113,7 @@ void ASTRowPolicyNames::formatImpl(const FormatSettings & settings, FormatState
<< (settings.hilite ? hilite_none : "");
if (!database.empty())
settings.ostr << backQuoteIfNeed(database) + ".";
settings.ostr << backQuoteIfNeed(table_name);
settings.ostr << tableOrAsterisk(table_name);
}
}

View File

@ -45,5 +45,8 @@ public:
ASTPtr getRewrittenASTWithoutOnCluster(const WithoutOnClusterASTRewriteParams &) const override { return removeOnCluster<ASTRowPolicyNames>(clone()); }
void replaceEmptyDatabase(const String & current_database);
private:
String tableOrAsterisk(const String & table_name) const;
};
}

View File

@ -35,21 +35,21 @@ void ASTSettingsProfileElement::formatImpl(const FormatSettings & settings, Form
formatSettingName(setting_name, settings.ostr);
if (!value.isNull())
if (value)
{
settings.ostr << " = " << applyVisitor(FieldVisitorToString{}, value);
settings.ostr << " = " << applyVisitor(FieldVisitorToString{}, *value);
}
if (!min_value.isNull())
if (min_value)
{
settings.ostr << (settings.hilite ? IAST::hilite_keyword : "") << " MIN " << (settings.hilite ? IAST::hilite_none : "")
<< applyVisitor(FieldVisitorToString{}, min_value);
<< applyVisitor(FieldVisitorToString{}, *min_value);
}
if (!max_value.isNull())
if (max_value)
{
settings.ostr << (settings.hilite ? IAST::hilite_keyword : "") << " MAX " << (settings.hilite ? IAST::hilite_none : "")
<< applyVisitor(FieldVisitorToString{}, max_value);
<< applyVisitor(FieldVisitorToString{}, *max_value);
}
if (writability)

View File

@ -14,9 +14,9 @@ class ASTSettingsProfileElement : public IAST
public:
String parent_profile;
String setting_name;
Field value;
Field min_value;
Field max_value;
std::optional<Field> value;
std::optional<Field> min_value;
std::optional<Field> max_value;
std::optional<SettingConstraintWritability> writability;
bool id_mode = false; /// If true then `parent_profile` keeps UUID, not a name.
bool use_inherit_keyword = false; /// If true then this element is a part of ASTCreateSettingsProfileQuery.

View File

@ -26,8 +26,18 @@ namespace
return IParserBase::wrapParseImpl(pos, [&]
{
String res_database, res_table_name;
if (!parseDatabaseAndTableName(pos, expected, res_database, res_table_name))
bool is_any_database = false;
bool is_any_table = false;
if (!parseDatabaseAndTableNameOrAsterisks(pos, expected, res_database, is_any_database, res_table_name, is_any_table)
|| is_any_database)
{
return false;
}
else if (is_any_table)
{
res_table_name = RowPolicyName::ANY_TABLE_MARK;
}
/// If table is specified without DB it cannot be followed by "ON"
/// (but can be followed by "ON CLUSTER").

View File

@ -52,7 +52,7 @@ namespace
}
bool parseValue(IParserBase::Pos & pos, Expected & expected, Field & res)
bool parseValue(IParserBase::Pos & pos, Expected & expected, std::optional<Field> & res)
{
return IParserBase::wrapParseImpl(pos, [&]
{
@ -69,7 +69,7 @@ namespace
}
bool parseMinMaxValue(IParserBase::Pos & pos, Expected & expected, Field & min_value, Field & max_value)
bool parseMinMaxValue(IParserBase::Pos & pos, Expected & expected, std::optional<Field> & min_value, std::optional<Field> & max_value)
{
return IParserBase::wrapParseImpl(pos, [&]
{
@ -124,9 +124,9 @@ namespace
IParserBase::Pos & pos,
Expected & expected,
String & setting_name,
Field & value,
Field & min_value,
Field & max_value,
std::optional<Field> & value,
std::optional<Field> & min_value,
std::optional<Field> & max_value,
std::optional<SettingConstraintWritability> & writability)
{
return IParserBase::wrapParseImpl(pos, [&]
@ -136,9 +136,9 @@ namespace
return false;
String res_setting_name = getIdentifierName(name_ast);
Field res_value;
Field res_min_value;
Field res_max_value;
std::optional<Field> res_value;
std::optional<Field> res_min_value;
std::optional<Field> res_max_value;
std::optional<SettingConstraintWritability> res_writability;
bool has_value_or_constraint = false;
@ -151,7 +151,7 @@ namespace
if (!has_value_or_constraint)
return false;
if (boost::iequals(res_setting_name, "PROFILE") && res_value.isNull() && res_min_value.isNull() && res_max_value.isNull()
if (boost::iequals(res_setting_name, "PROFILE") && !res_value && !res_min_value && !res_max_value
&& res_writability == SettingConstraintWritability::CONST)
{
/// Ambiguity: "profile readonly" can be treated either as a profile named "readonly" or
@ -181,9 +181,9 @@ namespace
{
String parent_profile;
String setting_name;
Field value;
Field min_value;
Field max_value;
std::optional<Field> value;
std::optional<Field> min_value;
std::optional<Field> max_value;
std::optional<SettingConstraintWritability> writability;
bool ok = parseSettingNameWithValueOrConstraints(pos, expected, setting_name, value, min_value, max_value, writability);

View File

@ -383,15 +383,6 @@ NamesAndTypesList ColumnsDescription::getEphemeral() const
return ret;
}
NamesAndTypesList ColumnsDescription::getWithDefaultExpression() const
{
NamesAndTypesList ret;
for (const auto & col : columns)
if (col.default_desc.expression)
ret.emplace_back(col.name, col.type);
return ret;
}
NamesAndTypesList ColumnsDescription::getAll() const
{
NamesAndTypesList ret;

View File

@ -132,7 +132,6 @@ public:
NamesAndTypesList getInsertable() const; /// ordinary + ephemeral
NamesAndTypesList getAliases() const;
NamesAndTypesList getEphemeral() const;
NamesAndTypesList getWithDefaultExpression() const; // columns with default expression, for example set by `CREATE TABLE` statement
NamesAndTypesList getAllPhysical() const; /// ordinary + materialized.
NamesAndTypesList getAll() const; /// ordinary + materialized + aliases + ephemeral
/// Returns .size0/.null/...

View File

@ -41,6 +41,7 @@
#include <Common/setThreadName.h>
#include <Formats/FormatFactory.h>
#include "Storages/ColumnDefault.h"
#include "config_version.h"
#include <Common/CurrentMetrics.h>
@ -966,9 +967,18 @@ void registerStorageKafka(StorageFactory & factory)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "kafka_poll_max_batch_size can not be lower than 1");
}
if (args.columns.getOrdinary() != args.columns.getAll() || !args.columns.getWithDefaultExpression().empty())
NamesAndTypesList supported_columns;
for (const auto & column : args.columns)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "KafkaEngine doesn't support DEFAULT/MATERIALIZED/EPHEMERAL/ALIAS expressions for columns. "
if (column.default_desc.kind == ColumnDefaultKind::Alias)
supported_columns.emplace_back(column.name, column.type);
if (column.default_desc.kind == ColumnDefaultKind::Default && !column.default_desc.expression)
supported_columns.emplace_back(column.name, column.type);
}
// Kafka engine allows only ordinary columns without default expression or alias columns.
if (args.columns.getAll() != supported_columns)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "KafkaEngine doesn't support DEFAULT/MATERIALIZED/EPHEMERAL expressions for columns. "
"See https://clickhouse.com/docs/en/engines/table-engines/integrations/kafka/#configuration");
}

View File

@ -290,6 +290,7 @@ void MergeTreeData::initializeDirectoriesAndFormatVersion(const std::string & re
{
auto buf = disk->writeFile(format_version_path, DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Rewrite, getContext()->getWriteSettings());
writeIntText(format_version.toUnderType(), *buf);
buf->finalize();
if (getContext()->getSettingsRef().fsync_metadata)
buf->sync();
}

View File

@ -160,7 +160,10 @@ void MergeTreeDeduplicationLog::rotate()
existing_logs.emplace(current_log_number, log_description);
if (current_writer)
{
current_writer->finalize();
current_writer->sync();
}
current_writer = disk->writeFile(log_description.path, DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Append);
}
@ -227,7 +230,7 @@ std::pair<MergeTreePartInfo, bool> MergeTreeDeduplicationLog::addPart(const std:
return std::make_pair(info, false);
}
assert(current_writer != nullptr);
chassert(current_writer != nullptr);
/// Create new record
MergeTreeDeduplicationLogRecord record;
@ -257,7 +260,7 @@ void MergeTreeDeduplicationLog::dropPart(const MergeTreePartInfo & drop_part_inf
if (deduplication_window == 0)
return;
assert(current_writer != nullptr);
chassert(current_writer != nullptr);
for (auto itr = deduplication_map.begin(); itr != deduplication_map.end(); /* no increment here, we erasing from map */)
{

View File

@ -75,6 +75,7 @@ MergeTreeMutationEntry::MergeTreeMutationEntry(MutationCommands commands_, DiskP
TransactionID::write(tid, *out);
*out << "\n";
}
out->finalize();
out->sync();
}
catch (...)

View File

@ -777,7 +777,6 @@ public:
key,
configuration_.request_settings,
std::nullopt,
DBMS_DEFAULT_BUFFER_SIZE,
threadPoolCallbackRunner<void>(IOThreadPool::get(), "S3ParallelWrite"),
context->getWriteSettings()),
compression_method,

View File

@ -87,27 +87,27 @@ void StorageSystemSettingsProfileElements::fillData(MutableColumns & res_columns
size_t current_index = index++;
bool inserted_value = false;
if (!element.value.isNull() && !element.setting_name.empty())
if (element.value && !element.setting_name.empty())
{
String str = Settings::valueToStringUtil(element.setting_name, element.value);
String str = Settings::valueToStringUtil(element.setting_name, *element.value);
column_value.insertData(str.data(), str.length());
column_value_null_map.push_back(false);
inserted_value = true;
}
bool inserted_min = false;
if (!element.min_value.isNull() && !element.setting_name.empty())
if (element.min_value && !element.setting_name.empty())
{
String str = Settings::valueToStringUtil(element.setting_name, element.min_value);
String str = Settings::valueToStringUtil(element.setting_name, *element.min_value);
column_min.insertData(str.data(), str.length());
column_min_null_map.push_back(false);
inserted_min = true;
}
bool inserted_max = false;
if (!element.max_value.isNull() && !element.setting_name.empty())
if (element.max_value && !element.setting_name.empty())
{
String str = Settings::valueToStringUtil(element.setting_name, element.max_value);
String str = Settings::valueToStringUtil(element.setting_name, *element.max_value);
column_max.insertData(str.data(), str.length());
column_max_null_map.push_back(false);
inserted_max = true;

View File

@ -138,3 +138,4 @@
01600_parts_states_metrics_long
01600_parts_types_metrics_long
01287_max_execution_speed
02703_row_policy_for_database

View File

@ -285,11 +285,11 @@ def avro_confluent_message(schema_registry_client, value):
# Tests
def test_kafka_prohibited_column_types(kafka_cluster):
def test_kafka_column_types(kafka_cluster):
def assert_returned_exception(e):
assert e.value.returncode == 36
assert (
"KafkaEngine doesn't support DEFAULT/MATERIALIZED/EPHEMERAL/ALIAS expressions for columns."
"KafkaEngine doesn't support DEFAULT/MATERIALIZED/EPHEMERAL expressions for columns."
in str(e.value)
)
@ -314,17 +314,39 @@ def test_kafka_prohibited_column_types(kafka_cluster):
assert_returned_exception(exception)
# check ALIAS
with pytest.raises(QueryRuntimeException) as exception:
instance.query(
"""
CREATE TABLE test.kafka (a Int, b String Alias toString(a))
ENGINE = Kafka('{kafka_broker}:19092', '{kafka_topic_new}', '{kafka_group_name_new}', '{kafka_format_json_each_row}', '\\n')
SETTINGS kafka_commit_on_select = 1;
"""
)
assert_returned_exception(exception)
messages = []
for i in range(5):
messages.append(json.dumps({"a": i}))
kafka_produce(kafka_cluster, "new", messages)
result = ""
expected = TSV(
"""
0\t0
1\t1
2\t2
3\t3
4\t4
"""
)
retries = 50
while retries > 0:
result += instance.query("SELECT a, b FROM test.kafka", ignore_error=True)
if TSV(result) == expected:
break
retries -= 1
assert TSV(result) == expected
instance.query("DROP TABLE test.kafka SYNC")
# check MATERIALIZED
# check ALIAS
with pytest.raises(QueryRuntimeException) as exception:
instance.query(
"""

View File

@ -1,3 +1,4 @@
--- assigning ---
5 UInt8
-177 Int16
98.11 Float64
@ -6,7 +7,7 @@ custom_a UInt64_5
custom_b Int64_-177
custom_c Float64_98.11
custom_d \'abc def\'
--- modifying ---
changed String
\N Nullable(Nothing)
50000 UInt16
@ -15,9 +16,10 @@ custom_a \'changed\'
custom_b NULL
custom_c UInt64_50000
custom_d Float64_1.11
--- undefined setting ---
404 UInt16
--- wrong prefix ---
--- using query context ---
-0.333 Float64
custom_e Float64_-0.333
404 UInt16
@ -25,7 +27,13 @@ custom_e UInt64_404
word String
custom_f \'word\'
0
--- compound identifier ---
test String
custom_compound.identifier.v1 \'test\'
CREATE SETTINGS PROFILE s1_01418 SETTINGS custom_compound.identifier.v2 = 100
--- null type ---
\N Nullable(Nothing)
custom_null NULL
\N Nullable(Nothing)
custom_null NULL
CREATE SETTINGS PROFILE s2_01418 SETTINGS custom_null = NULL

View File

@ -1,3 +1,6 @@
DROP SETTINGS PROFILE IF EXISTS s1_01418, s2_01418;
SELECT '--- assigning ---';
SET custom_a = 5;
SET custom_b = -177;
SET custom_c = 98.11;
@ -8,7 +11,7 @@ SELECT getSetting('custom_c') as v, toTypeName(v);
SELECT getSetting('custom_d') as v, toTypeName(v);
SELECT name, value FROM system.settings WHERE name LIKE 'custom_%' ORDER BY name;
SELECT '';
SELECT '--- modifying ---';
SET custom_a = 'changed';
SET custom_b = NULL;
SET custom_c = 50000;
@ -19,14 +22,15 @@ SELECT getSetting('custom_c') as v, toTypeName(v);
SELECT getSetting('custom_d') as v, toTypeName(v);
SELECT name, value FROM system.settings WHERE name LIKE 'custom_%' ORDER BY name;
SELECT '';
SELECT '--- undefined setting ---';
SELECT getSetting('custom_e') as v, toTypeName(v); -- { serverError 115 } -- Setting not found.
SET custom_e = 404;
SELECT getSetting('custom_e') as v, toTypeName(v);
SELECT '--- wrong prefix ---';
SET invalid_custom = 8; -- { serverError 115 } -- Setting is neither a builtin nor started with one of the registered prefixes for user-defined settings.
SELECT '';
SELECT '--- using query context ---';
SELECT getSetting('custom_e') as v, toTypeName(v) SETTINGS custom_e = -0.333;
SELECT name, value FROM system.settings WHERE name = 'custom_e' SETTINGS custom_e = -0.333;
SELECT getSetting('custom_e') as v, toTypeName(v);
@ -37,7 +41,7 @@ SELECT name, value FROM system.settings WHERE name = 'custom_f' SETTINGS custom_
SELECT getSetting('custom_f') as v, toTypeName(v); -- { serverError 115 } -- Setting not found.
SELECT COUNT() FROM system.settings WHERE name = 'custom_f';
SELECT '';
SELECT '--- compound identifier ---';
SET custom_compound.identifier.v1 = 'test';
SELECT getSetting('custom_compound.identifier.v1') as v, toTypeName(v);
SELECT name, value FROM system.settings WHERE name = 'custom_compound.identifier.v1';
@ -45,3 +49,15 @@ SELECT name, value FROM system.settings WHERE name = 'custom_compound.identifier
CREATE SETTINGS PROFILE s1_01418 SETTINGS custom_compound.identifier.v2 = 100;
SHOW CREATE SETTINGS PROFILE s1_01418;
DROP SETTINGS PROFILE s1_01418;
SELECT '--- null type ---';
SELECT getSetting('custom_null') as v, toTypeName(v) SETTINGS custom_null = NULL;
SELECT name, value FROM system.settings WHERE name = 'custom_null' SETTINGS custom_null = NULL;
SET custom_null = NULL;
SELECT getSetting('custom_null') as v, toTypeName(v);
SELECT name, value FROM system.settings WHERE name = 'custom_null';
CREATE SETTINGS PROFILE s2_01418 SETTINGS custom_null = NULL;
SHOW CREATE SETTINGS PROFILE s2_01418;
DROP SETTINGS PROFILE s2_01418;

View File

@ -6,6 +6,7 @@ SET skip_download_if_exceeds_query_cache=1;
SET filesystem_cache_max_download_size=128;
DROP TABLE IF EXISTS test;
CREATE TABLE test (key UInt32, value String) Engine=MergeTree() ORDER BY key SETTINGS storage_policy='s3_cache_4', min_bytes_for_wide_part = 10485760, compress_marks=false, compress_primary_key=false;
SYSTEM DROP FILESYSTEM CACHE;
INSERT INTO test SELECT number, toString(number) FROM numbers(100);
SELECT * FROM test FORMAT Null;
SELECT file_segment_range_begin, file_segment_range_end, size FROM system.filesystem_cache ORDER BY file_segment_range_end, size;

View File

@ -9,8 +9,8 @@ SET filesystem_cache_max_download_size=128;
DROP TABLE IF EXISTS test;
CREATE TABLE test (key UInt32, value String) Engine=MergeTree() ORDER BY key SETTINGS storage_policy='s3_cache_4', min_bytes_for_wide_part = 10485760, compress_marks=false, compress_primary_key=false;
SYSTEM DROP FILESYSTEM CACHE;
INSERT INTO test SELECT number, toString(number) FROM numbers(100);
SELECT * FROM test FORMAT Null;
SELECT file_segment_range_begin, file_segment_range_end, size FROM system.filesystem_cache ORDER BY file_segment_range_end, size;
SYSTEM DROP FILESYSTEM CACHE;

View File

@ -1,13 +1,15 @@
Using storage policy: s3_cache
0
Expect cache
DOWNLOADED 0 0 1
DOWNLOADED 0 79 80
DOWNLOADED 0 745 746
2
3
Expect cache
DOWNLOADED 0 0 1
DOWNLOADED 0 79 80
DOWNLOADED 0 745 746
2
3
Expect no cache
Expect cache
DOWNLOADED 0 79 80
@ -15,13 +17,15 @@ DOWNLOADED 0 745 746
2
Expect no cache
Expect cache
DOWNLOADED 0 0 1
DOWNLOADED 0 79 80
DOWNLOADED 0 745 746
2
3
Expect cache
DOWNLOADED 0 0 1
DOWNLOADED 0 79 80
DOWNLOADED 0 745 746
2
3
Expect no cache
Expect cache
DOWNLOADED 0 79 80
@ -31,13 +35,15 @@ Expect no cache
Using storage policy: local_cache
0
Expect cache
DOWNLOADED 0 0 1
DOWNLOADED 0 79 80
DOWNLOADED 0 745 746
2
3
Expect cache
DOWNLOADED 0 0 1
DOWNLOADED 0 79 80
DOWNLOADED 0 745 746
2
3
Expect no cache
Expect cache
DOWNLOADED 0 79 80
@ -45,13 +51,15 @@ DOWNLOADED 0 745 746
2
Expect no cache
Expect cache
DOWNLOADED 0 0 1
DOWNLOADED 0 79 80
DOWNLOADED 0 745 746
2
3
Expect cache
DOWNLOADED 0 0 1
DOWNLOADED 0 79 80
DOWNLOADED 0 745 746
2
3
Expect no cache
Expect cache
DOWNLOADED 0 79 80

View File

@ -1,6 +1,6 @@
Using storage policy: s3_cache
0
0
0 0
Row 1:
──────
file_segment_range_begin: 0
@ -8,11 +8,11 @@ file_segment_range_end: 745
size: 746
state: DOWNLOADED
8
8
8 1100
0
2
2
8
8 1100
Row 1:
──────
file_segment_range_begin: 0
@ -20,17 +20,17 @@ file_segment_range_end: 1659
size: 1660
state: DOWNLOADED
8
8
8
8
24
35
43
8 2014
8 2014
8 2014
24 84045
35 168815
44 252113
5010500
18816
Using storage policy: local_cache
0
0
0 0
Row 1:
──────
file_segment_range_begin: 0
@ -38,11 +38,11 @@ file_segment_range_end: 745
size: 746
state: DOWNLOADED
8
8
8 1100
0
2
2
8
8 1100
Row 1:
──────
file_segment_range_begin: 0
@ -50,11 +50,11 @@ file_segment_range_end: 1659
size: 1660
state: DOWNLOADED
8
8
8
8
24
35
43
8 2014
8 2014
8 2014
24 84045
35 168815
44 252113
5010500
18816

View File

@ -33,7 +33,7 @@ for STORAGE_POLICY in 's3_cache' 'local_cache'; do
FORMAT Vertical"
$CLICKHOUSE_CLIENT --query "SELECT count() FROM (SELECT arrayJoin(cache_paths) AS cache_path, local_path, remote_path FROM system.remote_data_paths ) AS data_paths INNER JOIN system.filesystem_cache AS caches ON data_paths.cache_path = caches.cache_path"
$CLICKHOUSE_CLIENT --query "SELECT count() FROM system.filesystem_cache"
$CLICKHOUSE_CLIENT --query "SELECT count(), sum(size) FROM system.filesystem_cache"
$CLICKHOUSE_CLIENT --enable_filesystem_cache_on_write_operations=1 --query "INSERT INTO test_02241 SELECT number, toString(number) FROM numbers(100)"
@ -54,7 +54,7 @@ for STORAGE_POLICY in 's3_cache' 'local_cache'; do
FORMAT Vertical"
$CLICKHOUSE_CLIENT --query "SELECT count() FROM (SELECT arrayJoin(cache_paths) AS cache_path, local_path, remote_path FROM system.remote_data_paths ) AS data_paths INNER JOIN system.filesystem_cache AS caches ON data_paths.cache_path = caches.cache_path"
$CLICKHOUSE_CLIENT --query "SELECT count() FROM system.filesystem_cache"
$CLICKHOUSE_CLIENT --query "SELECT count(), sum(size) FROM system.filesystem_cache"
$CLICKHOUSE_CLIENT --query "SELECT count() FROM system.filesystem_cache WHERE cache_hits > 0"
@ -64,7 +64,7 @@ for STORAGE_POLICY in 's3_cache' 'local_cache'; do
$CLICKHOUSE_CLIENT --query "SELECT * FROM test_02241 FORMAT Null"
$CLICKHOUSE_CLIENT --query "SELECT count() FROM system.filesystem_cache WHERE cache_hits > 0"
$CLICKHOUSE_CLIENT --query "SELECT count() size FROM system.filesystem_cache"
$CLICKHOUSE_CLIENT --query "SELECT count(), sum(size) size FROM system.filesystem_cache"
$CLICKHOUSE_CLIENT --query "SYSTEM DROP FILESYSTEM CACHE"
@ -87,24 +87,23 @@ for STORAGE_POLICY in 's3_cache' 'local_cache'; do
FORMAT Vertical;"
$CLICKHOUSE_CLIENT --query "SELECT count() FROM (SELECT arrayJoin(cache_paths) AS cache_path, local_path, remote_path FROM system.remote_data_paths ) AS data_paths INNER JOIN system.filesystem_cache AS caches ON data_paths.cache_path = caches.cache_path"
$CLICKHOUSE_CLIENT --query "SELECT count() FROM system.filesystem_cache"
$CLICKHOUSE_CLIENT --query "SELECT count(), sum(size) FROM system.filesystem_cache"
$CLICKHOUSE_CLIENT --query "SELECT count() FROM system.filesystem_cache"
$CLICKHOUSE_CLIENT --query "SELECT count(), sum(size) FROM system.filesystem_cache"
$CLICKHOUSE_CLIENT --enable_filesystem_cache_on_write_operations=1 --query "INSERT INTO test_02241 SELECT number, toString(number) FROM numbers(100) SETTINGS enable_filesystem_cache_on_write_operations=0"
$CLICKHOUSE_CLIENT --query "SELECT count() FROM system.filesystem_cache"
$CLICKHOUSE_CLIENT --query "SELECT count(), sum(size) FROM system.filesystem_cache"
$CLICKHOUSE_CLIENT --enable_filesystem_cache_on_write_operations=1 --query "INSERT INTO test_02241 SELECT number, toString(number) FROM numbers(100)"
$CLICKHOUSE_CLIENT --enable_filesystem_cache_on_write_operations=1 --query "INSERT INTO test_02241 SELECT number, toString(number) FROM numbers(300, 10000)"
$CLICKHOUSE_CLIENT --query "SELECT count() FROM system.filesystem_cache"
$CLICKHOUSE_CLIENT --query "SELECT count(), sum(size) FROM system.filesystem_cache"
$CLICKHOUSE_CLIENT --query "SYSTEM START MERGES test_02241"
$CLICKHOUSE_CLIENT --enable_filesystem_cache_on_write_operations=1 --query "OPTIMIZE TABLE test_02241 FINAL"
$CLICKHOUSE_CLIENT --query "SELECT count() FROM system.filesystem_cache"
$CLICKHOUSE_CLIENT --query "SELECT count(), sum(size) FROM system.filesystem_cache"
$CLICKHOUSE_CLIENT --enable_filesystem_cache_on_write_operations=1 --mutations_sync=2 --query "ALTER TABLE test_02241 UPDATE value = 'kek' WHERE key = 100"
$CLICKHOUSE_CLIENT --query "SELECT count() FROM system.filesystem_cache"
$CLICKHOUSE_CLIENT --query "SELECT count(), sum(size) FROM system.filesystem_cache"
$CLICKHOUSE_CLIENT --enable_filesystem_cache_on_write_operations=1 --query "INSERT INTO test_02241 SELECT number, toString(number) FROM numbers(5000000)"
$CLICKHOUSE_CLIENT --query "SYSTEM FLUSH LOGS"

View File

@ -8,7 +8,7 @@ SYSTEM STOP MERGES nopers;
INSERT INTO nopers SELECT number, toString(number) FROM numbers(10);
SELECT * FROM nopers FORMAT Null;
SELECT sum(size) FROM system.filesystem_cache;
194
195
SELECT extract(local_path, '.*/([\w.]+)') as file, extract(cache_path, '.*/([\w.]+)') as cache, size
FROM
(
@ -21,17 +21,18 @@ ON data_paths.cache_path = caches.cache_path
ORDER BY file, cache, size;
data.bin 0 114
data.mrk3 0 80
format_version.txt 0 1
DROP TABLE IF EXISTS test;
CREATE TABLE test (key UInt32, value String) Engine=MergeTree() ORDER BY key SETTINGS storage_policy='s3_cache_small', min_bytes_for_wide_part = 10485760, compress_marks=false, compress_primary_key=false;
SYSTEM STOP MERGES test;
INSERT INTO test SELECT number, toString(number) FROM numbers(100);
SELECT * FROM test FORMAT Null;
SELECT sum(size) FROM system.filesystem_cache;
1020
1021
SELECT count() FROM (SELECT arrayJoin(cache_paths) AS cache_path, local_path, remote_path FROM system.remote_data_paths ) AS data_paths INNER JOIN system.filesystem_cache AS caches ON data_paths.cache_path = caches.cache_path;
4
5
SELECT count() FROM system.filesystem_cache;
4
5
SELECT extract(local_path, '.*/([\w.]+)') as file, extract(cache_path, '.*/([\w.]+)') as cache, size
FROM
(
@ -46,17 +47,18 @@ data.bin 0 114
data.bin 0 746
data.mrk3 0 80
data.mrk3 0_persistent 80
format_version.txt 0 1
DROP TABLE IF EXISTS test2;
CREATE TABLE test2 (key UInt32, value String) Engine=MergeTree() ORDER BY key SETTINGS storage_policy='s3_cache_small', min_bytes_for_wide_part = 10485760, compress_marks=false, compress_primary_key=false;
SYSTEM STOP MERGES test2;
INSERT INTO test2 SELECT number, toString(number) FROM numbers(100000);
SELECT * FROM test2 FORMAT Null;
SELECT sum(size) FROM system.filesystem_cache;
794
795
SELECT count() FROM (SELECT arrayJoin(cache_paths) AS cache_path, local_path, remote_path FROM system.remote_data_paths ) AS data_paths INNER JOIN system.filesystem_cache AS caches ON data_paths.cache_path = caches.cache_path;
4
5
SELECT count() FROM system.filesystem_cache;
4
5
SELECT extract(local_path, '.*/([\w.]+)') as file, extract(cache_path, '.*/([\w.]+)') as cache, size
FROM
(
@ -71,6 +73,7 @@ data.bin 0 114
data.mrk3 0 80
data.mrk3 0_persistent 80
data.mrk3 0_persistent 520
format_version.txt 0 1
DROP TABLE test;
DROP TABLE test2;
DROP TABLE nopers;

View File

@ -1,8 +1,8 @@
INSERT TO S3
[ 0 ] S3CompleteMultipartUpload: 1
[ 0 ] S3CreateMultipartUpload: 1
[ 0 ] S3HeadObject: 1
[ 0 ] S3ReadRequestsCount: 1
[ 0 ] S3HeadObject: 2
[ 0 ] S3ReadRequestsCount: 2
[ 0 ] S3UploadPart: 1
[ 0 ] S3WriteRequestsCount: 3
CHECK WITH query_log

View File

@ -0,0 +1,2 @@
Policy for table `*` does not affect other tables in the database
other 100 20

View File

@ -0,0 +1,11 @@
-- Tags: no-parallel
SELECT 'Policy for table `*` does not affect other tables in the database';
CREATE DATABASE 02703_db_asterisk;
CREATE ROW POLICY 02703_asterisk ON 02703_db_asterisk.`*` USING x=1 AS permissive TO ALL;
CREATE TABLE 02703_db_asterisk.`*` (x UInt8, y UInt8) ENGINE = MergeTree ORDER BY x AS SELECT 100, 20;
CREATE TABLE 02703_db_asterisk.`other` (x UInt8, y UInt8) ENGINE = MergeTree ORDER BY x AS SELECT 100, 20;
SELECT 'star', * FROM 02703_db_asterisk.`*`;
SELECT 'other', * FROM 02703_db_asterisk.other;
DROP ROW POLICY 02703_asterisk ON 02703_db_asterisk.`*`;
DROP DATABASE 02703_db_asterisk;

View File

@ -0,0 +1,42 @@
None
1 10
2 20
3 30
4 40
R1: x == 1
1 10
R1, R2: (x == 1) OR (x == 2)
1 10
2 20
R1, R2: (x == 2) FROM ANOTHER
2 20
R1, R2, R3: (x == 1) OR (x == 2) OR (x == 3)
1 10
2 20
3 30
R1, R2, R3, R4: ((x == 1) OR (x == 2) OR (x == 3)) AND (x <= 2)
1 10
2 20
R1, R2, R3, R4, R5: ((x == 1) OR (x == 2) OR (x == 3)) AND (x <= 2) AND (y >= 20)
2 20
2 20
R1, R2, R3, R4, R5: (x == 2) AND (y >= 20) FROM AFTER_RP
2 20
R1, R2, R3, R4, R5: (x == 2) AND (y >= 20) FROM ANOTHER
2 20
R2, R3, R4, R5: ((x == 2) OR (x == 3)) AND (x <= 2) AND (y >= 20)
2 20
R3, R4, R5: (x == 3) AND (x <= 2) AND (y >= 20)
R4, R5: (x <= 2) AND (y >= 20)
2 20
R5: (x >= 2)
2 20
3 30
4 40
Policy not applicable
None
1 10
2 20
3 30
4 40
No problematic policy, select works

View File

@ -0,0 +1,88 @@
-- Tags: no-parallel
DROP DATABASE IF EXISTS 02703_db;
CREATE DATABASE 02703_db;
DROP TABLE IF EXISTS 02703_db.02703_rptable;
DROP TABLE IF EXISTS 02703_db.02703_rptable_another;
CREATE TABLE 02703_db.02703_rptable (x UInt8, y UInt8) ENGINE = MergeTree ORDER BY x;
INSERT INTO 02703_db.02703_rptable VALUES (1, 10), (2, 20), (3, 30), (4, 40);
CREATE TABLE 02703_db.02703_rptable_another ENGINE = MergeTree ORDER BY x AS SELECT * FROM 02703_db.02703_rptable;
DROP ROW POLICY IF EXISTS 02703_filter_1 ON 02703_db.02703_rptable;
DROP ROW POLICY IF EXISTS 02703_filter_2 ON 02703_db.*;
DROP ROW POLICY IF EXISTS 02703_filter_3 ON 02703_db.02703_rptable;
DROP ROW POLICY IF EXISTS 02703_filter_4 ON 02703_db.02703_rptable;
DROP ROW POLICY IF EXISTS 02703_filter_5 ON 02703_db.*;
-- the test assumes users_without_row_policies_can_read_rows is true
SELECT 'None';
SELECT * FROM 02703_db.02703_rptable;
CREATE ROW POLICY 02703_filter_1 ON 02703_db.02703_rptable USING x=1 AS permissive TO ALL;
SELECT 'R1: x == 1';
SELECT * FROM 02703_db.02703_rptable;
CREATE ROW POLICY 02703_filter_2 ON 02703_db.* USING x=2 AS permissive TO ALL;
SELECT 'R1, R2: (x == 1) OR (x == 2)';
SELECT * FROM 02703_db.02703_rptable;
SELECT 'R1, R2: (x == 2) FROM ANOTHER';
SELECT * FROM 02703_db.02703_rptable_another;
CREATE ROW POLICY 02703_filter_3 ON 02703_db.02703_rptable USING x=3 AS permissive TO ALL;
SELECT 'R1, R2, R3: (x == 1) OR (x == 2) OR (x == 3)';
SELECT * FROM 02703_db.02703_rptable;
CREATE ROW POLICY 02703_filter_4 ON 02703_db.02703_rptable USING x<=2 AS restrictive TO ALL;
SELECT 'R1, R2, R3, R4: ((x == 1) OR (x == 2) OR (x == 3)) AND (x <= 2)';
SELECT * FROM 02703_db.02703_rptable;
CREATE ROW POLICY 02703_filter_5 ON 02703_db.* USING y>=20 AS restrictive TO ALL;
SELECT 'R1, R2, R3, R4, R5: ((x == 1) OR (x == 2) OR (x == 3)) AND (x <= 2) AND (y >= 20)';
SELECT * FROM 02703_db.02703_rptable;
CREATE TABLE 02703_db.02703_after_rp ENGINE = MergeTree ORDER BY x AS SELECT * FROM 02703_db.02703_rptable;
SELECT * FROM 02703_db.02703_after_rp;
-- does not matter if policies or table are created first
SELECT 'R1, R2, R3, R4, R5: (x == 2) AND (y >= 20) FROM AFTER_RP';
SELECT * FROM 02703_db.02703_after_rp;
SELECT 'R1, R2, R3, R4, R5: (x == 2) AND (y >= 20) FROM ANOTHER';
SELECT * FROM 02703_db.02703_rptable_another;
DROP ROW POLICY 02703_filter_1 ON 02703_db.02703_rptable;
SELECT 'R2, R3, R4, R5: ((x == 2) OR (x == 3)) AND (x <= 2) AND (y >= 20)';
SELECT * FROM 02703_db.02703_rptable;
DROP ROW POLICY 02703_filter_2 ON 02703_db.*;
SELECT 'R3, R4, R5: (x == 3) AND (x <= 2) AND (y >= 20)';
SELECT * FROM 02703_db.02703_rptable;
DROP ROW POLICY 02703_filter_3 ON 02703_db.02703_rptable;
SELECT 'R4, R5: (x <= 2) AND (y >= 20)';
SELECT * FROM 02703_db.02703_rptable;
DROP ROW POLICY 02703_filter_4 ON 02703_db.02703_rptable;
SELECT 'R5: (x >= 2)';
SELECT * FROM 02703_db.02703_rptable;
CREATE TABLE 02703_db.02703_unexpected_columns (xx UInt8, yy UInt8) ENGINE = MergeTree ORDER BY xx;
SELECT 'Policy not applicable';
SELECT * FROM 02703_db.02703_unexpected_columns; -- { serverError 47 } -- Missing columns: 'x' while processing query
DROP ROW POLICY 02703_filter_5 ON 02703_db.*;
SELECT 'None';
SELECT * FROM 02703_db.02703_rptable;
SELECT 'No problematic policy, select works';
SELECT 'Ok' FROM 02703_db.02703_unexpected_columns;
DROP TABLE 02703_db.02703_rptable;
DROP TABLE 02703_db.02703_rptable_another;
DROP TABLE 02703_db.02703_unexpected_columns;
DROP DATABASE 02703_db;

View File

@ -0,0 +1,20 @@
-- row policies for database
-- SHOW CREATE POLICY db1_02703 ON db1_02703.*
CREATE ROW POLICY db1_02703 ON db1_02703.* FOR SELECT USING 1 TO ALL
-- SHOW CREATE POLICY ON db1_02703.*
CREATE ROW POLICY db1_02703 ON db1_02703.* FOR SELECT USING 1 TO ALL
CREATE ROW POLICY tbl1_02703 ON db1_02703.table FOR SELECT USING 1 TO ALL
-- SHOW CREATE POLICY ON db1_02703.`*`
R1, R2: (x == 1) OR (x == 2)
1
2
Check system.query_log
SELECT \'-- row policies for database\'; []
SELECT \' -- SHOW CREATE POLICY db1_02703 ON db1_02703.*\'; []
SELECT \' -- SHOW CREATE POLICY ON db1_02703.*\'; []
SELECT \' -- SHOW CREATE POLICY ON db1_02703.`*`\'; []
SELECT \'R1, R2: (x == 1) OR (x == 2)\'; []
SELECT * FROM 02703_rqtable_default; ['`02703_filter_11_db` ON default.*','`02703_filter_11` ON default.`02703_rqtable_default`']
SELECT \'Check system.query_log\'; []
-- CREATE DATABASE-LEVEL POLICY IN CURRENT DATABASE
CREATE ROW POLICY db2_02703 ON db1_02703.* TO u1_02703

View File

@ -0,0 +1,53 @@
-- Tags: no-parallel
DROP DATABASE IF EXISTS db1_02703;
DROP USER IF EXISTS u1_02703;
CREATE USER u1_02703;
CREATE DATABASE db1_02703;
CREATE TABLE db1_02703.02703_rqtable (x UInt8) ENGINE = MergeTree ORDER BY x;
INSERT INTO db1_02703.02703_rqtable VALUES (1), (2), (3), (4);
SELECT '-- row policies for database';
CREATE ROW POLICY db1_02703 ON db1_02703.* USING 1 AS PERMISSIVE TO ALL;
CREATE ROW POLICY tbl1_02703 ON db1_02703.table USING 1 AS PERMISSIVE TO ALL;
SELECT ' -- SHOW CREATE POLICY db1_02703 ON db1_02703.*';
SHOW CREATE POLICY db1_02703 ON db1_02703.*;
SELECT ' -- SHOW CREATE POLICY ON db1_02703.*';
SHOW CREATE POLICY ON db1_02703.*;
SELECT ' -- SHOW CREATE POLICY ON db1_02703.`*`';
SHOW CREATE POLICY ON db1_02703.`*`;
DROP POLICY db1_02703 ON db1_02703.*;
DROP POLICY tbl1_02703 ON db1_02703.table;
CREATE ROW POLICY any_02703 ON *.some_table USING 1 AS PERMISSIVE TO ALL; -- { clientError 62 }
CREATE TABLE 02703_rqtable_default (x UInt8) ENGINE = MergeTree ORDER BY x;
CREATE ROW POLICY 02703_filter_11_db ON * USING x=1 AS permissive TO ALL;
CREATE ROW POLICY 02703_filter_11 ON 02703_rqtable_default USING x=2 AS permissive TO ALL;
INSERT INTO 02703_rqtable_default VALUES (1), (2), (3), (4);
SELECT 'R1, R2: (x == 1) OR (x == 2)';
SELECT * FROM 02703_rqtable_default;
DROP TABLE 02703_rqtable_default;
SELECT 'Check system.query_log';
SYSTEM FLUSH LOGS;
SELECT query, used_row_policies FROM system.query_log WHERE current_database == currentDatabase() AND type == 'QueryStart' AND query_kind == 'Select' ORDER BY event_time_microseconds;
DROP ROW POLICY 02703_filter_11_db ON *;
DROP ROW POLICY 02703_filter_11 ON 02703_rqtable_default;
USE db1_02703;
SELECT ' -- CREATE DATABASE-LEVEL POLICY IN CURRENT DATABASE';
CREATE ROW POLICY db2_02703 ON * TO u1_02703;
SHOW CREATE POLICY db2_02703 ON *;
DROP ROW POLICY db2_02703 ON *;
DROP USER u1_02703;

View File

@ -1,4 +1,4 @@
Size: 6000001
Size: 6000001
Size: 6000001
Size: 2971517
part size: 6000001, part number: 1
part size: 6000001, part number: 2
part size: 6000001, part number: 3
part size: 2971517, part number: 4

View File

@ -19,7 +19,7 @@ $CLICKHOUSE_LOCAL -q "SELECT randomPrintableASCII(1023) FROM numbers(20*1024) FO
$CLICKHOUSE_CLIENT --send_logs_level=trace --server_logs_file="$log" -q "INSERT INTO FUNCTION s3(s3_conn, filename='$CLICKHOUSE_TEST_UNIQUE_NAME', format='LineAsString', structure='line String') FORMAT LineAsString" --s3_strict_upload_part_size=6000001 < "$in"
grep -F '<Fatal>' "$log" || :
grep -o 'WriteBufferFromS3: Writing part.*Size: .*' "$log" | grep -o 'Size: .*'
grep -o 'WriteBufferFromS3: writePart.*, part size: .*' "$log" | grep -o 'part size: .*'
$CLICKHOUSE_CLIENT -q "SELECT * FROM s3(s3_conn, filename='$CLICKHOUSE_TEST_UNIQUE_NAME', format='LineAsString', structure='line String') FORMAT LineAsString" > "$out"
diff -q "$in" "$out"

View File

@ -0,0 +1 @@
12639441726720293784

View File

@ -0,0 +1,7 @@
-- Tags: no-fasttest
-- Tag no-fasttest: Depends on AWS
-- Reading from s3 a parquet file of size between ~1 MB and ~2 MB was broken at some point.
insert into function s3(s3_conn, filename='test_02731_parquet_s3.parquet') select cityHash64(number) from numbers(170000) settings s3_truncate_on_insert=1;
select sum(*) from s3(s3_conn, filename='test_02731_parquet_s3.parquet') settings remote_filesystem_read_method='threadpool', remote_filesystem_read_prefetch=1;