mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-24 16:42:05 +00:00
Merge branch 'master' into better_parallel_hash3
This commit is contained in:
commit
68ade3c621
2
.github/ISSUE_TEMPLATE/20_feature-request.md
vendored
2
.github/ISSUE_TEMPLATE/20_feature-request.md
vendored
@ -15,7 +15,7 @@ assignees: ''
|
||||
|
||||
**Use case**
|
||||
|
||||
> A clear and concise description of what is the intended usage scenario is.
|
||||
> A clear and concise description of what the intended usage scenario is.
|
||||
|
||||
**Describe the solution you'd like**
|
||||
|
||||
|
@ -63,7 +63,34 @@ Currently there are 3 ways to authenticate:
|
||||
- `SAS Token` - Can be used by providing an `endpoint`, `connection_string` or `storage_account_url`. It is identified by presence of '?' in the url.
|
||||
- `Workload Identity` - Can be used by providing an `endpoint` or `storage_account_url`. If `use_workload_identity` parameter is set in config, ([workload identity](https://github.com/Azure/azure-sdk-for-cpp/tree/main/sdk/identity/azure-identity#authenticate-azure-hosted-applications)) is used for authentication.
|
||||
|
||||
### Data cache {#data-cache}
|
||||
|
||||
`Azure` table engine supports data caching on local disk.
|
||||
See filesystem cache configuration options and usage in this [section](/docs/en/operations/storing-data.md/#using-local-cache).
|
||||
Caching is made depending on the path and ETag of the storage object, so clickhouse will not read a stale cache version.
|
||||
|
||||
To enable caching use a setting `filesystem_cache_name = '<name>'` and `enable_filesystem_cache = 1`.
|
||||
|
||||
```sql
|
||||
SELECT *
|
||||
FROM azureBlobStorage('DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://azurite1:10000/devstoreaccount1/;', 'test_container', 'test_table', 'CSV')
|
||||
SETTINGS filesystem_cache_name = 'cache_for_azure', enable_filesystem_cache = 1;
|
||||
```
|
||||
|
||||
1. add the following section to clickhouse configuration file:
|
||||
|
||||
``` xml
|
||||
<clickhouse>
|
||||
<filesystem_caches>
|
||||
<cache_for_azure>
|
||||
<path>path to cache directory</path>
|
||||
<max_size>10Gi</max_size>
|
||||
</cache_for_azure>
|
||||
</filesystem_caches>
|
||||
</clickhouse>
|
||||
```
|
||||
|
||||
2. reuse cache configuration (and therefore cache storage) from clickhouse `storage_configuration` section, [described here](/docs/en/operations/storing-data.md/#using-local-cache)
|
||||
|
||||
## See also
|
||||
|
||||
|
@ -48,6 +48,10 @@ Using named collections:
|
||||
CREATE TABLE deltalake ENGINE=DeltaLake(deltalake_conf, filename = 'test_table')
|
||||
```
|
||||
|
||||
### Data cache {#data-cache}
|
||||
|
||||
`Iceberg` table engine and table function support data caching same as `S3`, `AzureBlobStorage`, `HDFS` storages. See [here](../../../engines/table-engines/integrations/s3.md#data-cache).
|
||||
|
||||
## See also
|
||||
|
||||
- [deltaLake table function](../../../sql-reference/table-functions/deltalake.md)
|
||||
|
@ -63,6 +63,10 @@ CREATE TABLE iceberg_table ENGINE=IcebergS3(iceberg_conf, filename = 'test_table
|
||||
|
||||
Table engine `Iceberg` is an alias to `IcebergS3` now.
|
||||
|
||||
### Data cache {#data-cache}
|
||||
|
||||
`Iceberg` table engine and table function support data caching same as `S3`, `AzureBlobStorage`, `HDFS` storages. See [here](../../../engines/table-engines/integrations/s3.md#data-cache).
|
||||
|
||||
## See also
|
||||
|
||||
- [iceberg table function](/docs/en/sql-reference/table-functions/iceberg.md)
|
||||
|
@ -26,6 +26,7 @@ SELECT * FROM s3_engine_table LIMIT 2;
|
||||
│ two │ 2 │
|
||||
└──────┴───────┘
|
||||
```
|
||||
|
||||
## Create Table {#creating-a-table}
|
||||
|
||||
``` sql
|
||||
@ -43,6 +44,37 @@ CREATE TABLE s3_engine_table (name String, value UInt32)
|
||||
- `aws_access_key_id`, `aws_secret_access_key` - Long-term credentials for the [AWS](https://aws.amazon.com/) account user. You can use these to authenticate your requests. Parameter is optional. If credentials are not specified, they are used from the configuration file. For more information see [Using S3 for Data Storage](../mergetree-family/mergetree.md#table_engine-mergetree-s3).
|
||||
- `compression` — Compression type. Supported values: `none`, `gzip/gz`, `brotli/br`, `xz/LZMA`, `zstd/zst`. Parameter is optional. By default, it will auto-detect compression by file extension.
|
||||
|
||||
### Data cache {#data-cache}
|
||||
|
||||
`S3` table engine supports data caching on local disk.
|
||||
See filesystem cache configuration options and usage in this [section](/docs/en/operations/storing-data.md/#using-local-cache).
|
||||
Caching is made depending on the path and ETag of the storage object, so clickhouse will not read a stale cache version.
|
||||
|
||||
To enable caching use a setting `filesystem_cache_name = '<name>'` and `enable_filesystem_cache = 1`.
|
||||
|
||||
```sql
|
||||
SELECT *
|
||||
FROM s3('http://minio:10000/clickhouse//test_3.csv', 'minioadmin', 'minioadminpassword', 'CSV')
|
||||
SETTINGS filesystem_cache_name = 'cache_for_s3', enable_filesystem_cache = 1;
|
||||
```
|
||||
|
||||
There are two ways to define cache in configuration file.
|
||||
|
||||
1. add the following section to clickhouse configuration file:
|
||||
|
||||
``` xml
|
||||
<clickhouse>
|
||||
<filesystem_caches>
|
||||
<cache_for_s3>
|
||||
<path>path to cache directory</path>
|
||||
<max_size>10Gi</max_size>
|
||||
</cache_for_s3>
|
||||
</filesystem_caches>
|
||||
</clickhouse>
|
||||
```
|
||||
|
||||
2. reuse cache configuration (and therefore cache storage) from clickhouse `storage_configuration` section, [described here](/docs/en/operations/storing-data.md/#using-local-cache)
|
||||
|
||||
### PARTITION BY
|
||||
|
||||
`PARTITION BY` — Optional. In most cases you don't need a partition key, and if it is needed you generally don't need a partition key more granular than by month. Partitioning does not speed up queries (in contrast to the ORDER BY expression). You should never use too granular partitioning. Don't partition your data by client identifiers or names (instead, make client identifier or name the first column in the ORDER BY expression).
|
||||
|
@ -9721,6 +9721,10 @@ Default value: 15
|
||||
|
||||
The heartbeat interval in seconds to indicate watch query is alive.
|
||||
|
||||
## enforce_strict_identifier_format
|
||||
|
||||
If enabled, only allow identifiers containing alphanumeric characters and underscores.
|
||||
|
||||
## workload {#workload}
|
||||
|
||||
Type: String
|
||||
|
@ -1496,6 +1496,8 @@ try
|
||||
|
||||
NamedCollectionFactory::instance().loadIfNot();
|
||||
|
||||
FileCacheFactory::instance().loadDefaultCaches(config());
|
||||
|
||||
/// Initialize main config reloader.
|
||||
std::string include_from_path = config().getString("include_from", "/etc/metrika.xml");
|
||||
|
||||
|
@ -78,11 +78,6 @@ struct WindowFunction : public IAggregateFunctionHelper<WindowFunction>, public
|
||||
}
|
||||
|
||||
String getName() const override { return name; }
|
||||
void create(AggregateDataPtr __restrict) const override { }
|
||||
void destroy(AggregateDataPtr __restrict) const noexcept override { }
|
||||
bool hasTrivialDestructor() const override { return true; }
|
||||
size_t sizeOfData() const override { return 0; }
|
||||
size_t alignOfData() const override { return 1; }
|
||||
void add(AggregateDataPtr __restrict, const IColumn **, size_t, Arena *) const override { fail(); }
|
||||
void merge(AggregateDataPtr __restrict, ConstAggregateDataPtr, Arena *) const override { fail(); }
|
||||
void serialize(ConstAggregateDataPtr __restrict, WriteBuffer &, std::optional<size_t>) const override { fail(); }
|
||||
@ -90,6 +85,22 @@ struct WindowFunction : public IAggregateFunctionHelper<WindowFunction>, public
|
||||
void insertResultInto(AggregateDataPtr __restrict, IColumn &, Arena *) const override { fail(); }
|
||||
};
|
||||
|
||||
struct StatelessWindowFunction : public WindowFunction
|
||||
{
|
||||
StatelessWindowFunction(
|
||||
const std::string & name_, const DataTypes & argument_types_, const Array & parameters_, const DataTypePtr & result_type_)
|
||||
: WindowFunction(name_, argument_types_, parameters_, result_type_)
|
||||
{
|
||||
}
|
||||
|
||||
size_t sizeOfData() const override { return 0; }
|
||||
size_t alignOfData() const override { return 1; }
|
||||
|
||||
void create(AggregateDataPtr __restrict) const override { }
|
||||
void destroy(AggregateDataPtr __restrict) const noexcept override { }
|
||||
bool hasTrivialDestructor() const override { return true; }
|
||||
};
|
||||
|
||||
template <typename State>
|
||||
struct StatefulWindowFunction : public WindowFunction
|
||||
{
|
||||
@ -100,7 +111,7 @@ struct StatefulWindowFunction : public WindowFunction
|
||||
}
|
||||
|
||||
size_t sizeOfData() const override { return sizeof(State); }
|
||||
size_t alignOfData() const override { return 1; }
|
||||
size_t alignOfData() const override { return alignof(State); }
|
||||
|
||||
void create(AggregateDataPtr __restrict place) const override { new (place) State(); }
|
||||
|
||||
|
@ -4812,6 +4812,9 @@ Max attempts to read with backoff
|
||||
)", 0) \
|
||||
M(Bool, enable_filesystem_cache, true, R"(
|
||||
Use cache for remote filesystem. This setting does not turn on/off cache for disks (must be done via disk config), but allows to bypass cache for some queries if intended
|
||||
)", 0) \
|
||||
M(String, filesystem_cache_name, "", R"(
|
||||
Filesystem cache name to use for stateless table engines or data lakes
|
||||
)", 0) \
|
||||
M(Bool, enable_filesystem_cache_on_write_operations, false, R"(
|
||||
Write into cache on write operations. To actually work this setting requires be added to disk config too
|
||||
@ -5498,8 +5501,8 @@ Replace external dictionary sources to Null on restore. Useful for testing purpo
|
||||
M(Bool, create_if_not_exists, false, R"(
|
||||
Enable `IF NOT EXISTS` for `CREATE` statement by default. If either this setting or `IF NOT EXISTS` is specified and a table with the provided name already exists, no exception will be thrown.
|
||||
)", 0) \
|
||||
M(Bool, enable_secure_identifiers, false, R"(
|
||||
If enabled, only allow secure identifiers which contain only underscore and alphanumeric characters
|
||||
M(Bool, enforce_strict_identifier_format, false, R"(
|
||||
If enabled, only allow identifiers containing alphanumeric characters and underscores.
|
||||
)", 0) \
|
||||
M(Bool, mongodb_throw_on_unsupported_query, true, R"(
|
||||
If enabled, MongoDB tables will return an error when a MongoDB query cannot be built. Otherwise, ClickHouse reads the full table and processes it locally. This option does not apply to the legacy implementation or when 'allow_experimental_analyzer=0'.
|
||||
|
@ -68,17 +68,18 @@ static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory
|
||||
},
|
||||
{"24.10",
|
||||
{
|
||||
{"enforce_strict_identifier_format", false, false, "New setting."},
|
||||
{"enable_parsing_to_custom_serialization", false, true, "New setting"},
|
||||
{"mongodb_throw_on_unsupported_query", false, true, "New setting."},
|
||||
{"enable_parallel_replicas", false, false, "Parallel replicas with read tasks became the Beta tier feature."},
|
||||
{"parallel_replicas_mode", "read_tasks", "read_tasks", "This setting was introduced as a part of making parallel replicas feature Beta"},
|
||||
{"filesystem_cache_name", "", "", "Filesystem cache name to use for stateless table engines or data lakes"},
|
||||
{"restore_replace_external_dictionary_source_to_null", false, false, "New setting."},
|
||||
{"show_create_query_identifier_quoting_rule", "when_necessary", "when_necessary", "New setting."},
|
||||
{"show_create_query_identifier_quoting_style", "Backticks", "Backticks", "New setting."},
|
||||
{"output_format_native_write_json_as_string", false, false, "Add new setting to allow write JSON column as single String column in Native format"},
|
||||
{"output_format_binary_write_json_as_string", false, false, "Add new setting to write values of JSON type as JSON string in RowBinary output format"},
|
||||
{"input_format_binary_read_json_as_string", false, false, "Add new setting to read values of JSON type as JSON string in RowBinary input format"},
|
||||
{"enable_secure_identifiers", false, false, "New setting."},
|
||||
{"min_free_disk_bytes_to_perform_insert", 0, 0, "New setting."},
|
||||
{"min_free_disk_ratio_to_perform_insert", 0.0, 0.0, "New setting."},
|
||||
{"cloud_mode_database_engine", 1, 1, "A setting for ClickHouse Cloud"},
|
||||
@ -111,7 +112,7 @@ static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory
|
||||
{"allow_materialized_view_with_bad_select", true, true, "Support (but not enable yet) stricter validation in CREATE MATERIALIZED VIEW"},
|
||||
{"parallel_replicas_mark_segment_size", 128, 0, "Value for this setting now determined automatically"},
|
||||
{"database_replicated_allow_replicated_engine_arguments", 1, 0, "Don't allow explicit arguments by default"},
|
||||
{"database_replicated_allow_explicit_uuid", 0, 0, "Added a new setting to disallow explicitly specifying table UUID"},
|
||||
{"database_replicated_allow_explicit_uuid", 1, 0, "Added a new setting to disallow explicitly specifying table UUID"},
|
||||
{"parallel_replicas_local_plan", false, false, "Use local plan for local replica in a query with parallel replicas"},
|
||||
{"join_to_sort_minimum_perkey_rows", 0, 40, "The lower limit of per-key average rows in the right table to determine whether to rerange the right table by key in left or inner join. This setting ensures that the optimization is not applied for sparse table keys"},
|
||||
{"join_to_sort_maximum_table_rows", 0, 10000, "The maximum number of rows in the right table to determine whether to rerange the right table by key in left or inner join"},
|
||||
|
@ -31,7 +31,7 @@ CachedObjectStorage::CachedObjectStorage(
|
||||
|
||||
FileCache::Key CachedObjectStorage::getCacheKey(const std::string & path) const
|
||||
{
|
||||
return cache->createKeyForPath(path);
|
||||
return FileCacheKey::fromPath(path);
|
||||
}
|
||||
|
||||
ObjectStorageKey
|
||||
@ -71,7 +71,7 @@ std::unique_ptr<ReadBufferFromFileBase> CachedObjectStorage::readObject( /// NOL
|
||||
{
|
||||
if (cache->isInitialized())
|
||||
{
|
||||
auto cache_key = cache->createKeyForPath(object.remote_path);
|
||||
auto cache_key = FileCacheKey::fromPath(object.remote_path);
|
||||
auto global_context = Context::getGlobalContextInstance();
|
||||
auto modified_read_settings = read_settings.withNestedBuffer();
|
||||
|
||||
|
@ -122,11 +122,6 @@ FileCache::FileCache(const std::string & cache_name, const FileCacheSettings & s
|
||||
query_limit = std::make_unique<FileCacheQueryLimit>();
|
||||
}
|
||||
|
||||
FileCache::Key FileCache::createKeyForPath(const String & path)
|
||||
{
|
||||
return Key(path);
|
||||
}
|
||||
|
||||
const FileCache::UserInfo & FileCache::getCommonUser()
|
||||
{
|
||||
static UserInfo user(getCommonUserID(), 0);
|
||||
@ -1168,7 +1163,7 @@ void FileCache::removeFileSegment(const Key & key, size_t offset, const UserID &
|
||||
|
||||
void FileCache::removePathIfExists(const String & path, const UserID & user_id)
|
||||
{
|
||||
removeKeyIfExists(createKeyForPath(path), user_id);
|
||||
removeKeyIfExists(Key::fromPath(path), user_id);
|
||||
}
|
||||
|
||||
void FileCache::removeAllReleasable(const UserID & user_id)
|
||||
|
@ -88,8 +88,6 @@ public:
|
||||
|
||||
const String & getBasePath() const;
|
||||
|
||||
static Key createKeyForPath(const String & path);
|
||||
|
||||
static const UserInfo & getCommonUser();
|
||||
|
||||
static const UserInfo & getInternalUser();
|
||||
|
@ -1,5 +1,6 @@
|
||||
#include "FileCacheFactory.h"
|
||||
#include "FileCache.h"
|
||||
#include <Poco/Util/AbstractConfiguration.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -43,6 +44,16 @@ FileCacheFactory::CacheByName FileCacheFactory::getAll()
|
||||
return caches_by_name;
|
||||
}
|
||||
|
||||
FileCachePtr FileCacheFactory::get(const std::string & cache_name)
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
|
||||
auto it = caches_by_name.find(cache_name);
|
||||
if (it == caches_by_name.end())
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "There is no cache by name `{}`", cache_name);
|
||||
return it->second->cache;
|
||||
}
|
||||
|
||||
FileCachePtr FileCacheFactory::getOrCreate(
|
||||
const std::string & cache_name,
|
||||
const FileCacheSettings & file_cache_settings,
|
||||
@ -202,4 +213,20 @@ void FileCacheFactory::clear()
|
||||
caches_by_name.clear();
|
||||
}
|
||||
|
||||
void FileCacheFactory::loadDefaultCaches(const Poco::Util::AbstractConfiguration & config)
|
||||
{
|
||||
Poco::Util::AbstractConfiguration::Keys cache_names;
|
||||
config.keys(FILECACHE_DEFAULT_CONFIG_PATH, cache_names);
|
||||
auto * log = &Poco::Logger::get("FileCacheFactory");
|
||||
LOG_DEBUG(log, "Will load {} caches from default cache config", cache_names.size());
|
||||
for (const auto & name : cache_names)
|
||||
{
|
||||
FileCacheSettings settings;
|
||||
const auto & config_path = fmt::format("{}.{}", FILECACHE_DEFAULT_CONFIG_PATH, name);
|
||||
settings.loadFromConfig(config, config_path);
|
||||
auto cache = getOrCreate(name, settings, config_path);
|
||||
cache->initialize();
|
||||
LOG_DEBUG(log, "Loaded cache `{}` from default cache config", name);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -44,6 +44,8 @@ public:
|
||||
const FileCacheSettings & file_cache_settings,
|
||||
const std::string & config_path);
|
||||
|
||||
FileCachePtr get(const std::string & cache_name);
|
||||
|
||||
FileCachePtr create(
|
||||
const std::string & cache_name,
|
||||
const FileCacheSettings & file_cache_settings,
|
||||
@ -53,8 +55,12 @@ public:
|
||||
|
||||
FileCacheDataPtr getByName(const std::string & cache_name);
|
||||
|
||||
void loadDefaultCaches(const Poco::Util::AbstractConfiguration & config);
|
||||
|
||||
void updateSettingsFromConfig(const Poco::Util::AbstractConfiguration & config);
|
||||
|
||||
void remove(FileCachePtr cache);
|
||||
|
||||
void clear();
|
||||
|
||||
private:
|
||||
|
@ -12,11 +12,6 @@ namespace ErrorCodes
|
||||
extern const int BAD_ARGUMENTS;
|
||||
}
|
||||
|
||||
FileCacheKey::FileCacheKey(const std::string & path)
|
||||
: key(sipHash128(path.data(), path.size()))
|
||||
{
|
||||
}
|
||||
|
||||
FileCacheKey::FileCacheKey(const UInt128 & key_)
|
||||
: key(key_)
|
||||
{
|
||||
@ -32,6 +27,16 @@ FileCacheKey FileCacheKey::random()
|
||||
return FileCacheKey(UUIDHelpers::generateV4().toUnderType());
|
||||
}
|
||||
|
||||
FileCacheKey FileCacheKey::fromPath(const std::string & path)
|
||||
{
|
||||
return FileCacheKey(sipHash128(path.data(), path.size()));
|
||||
}
|
||||
|
||||
FileCacheKey FileCacheKey::fromKey(const UInt128 & key)
|
||||
{
|
||||
return FileCacheKey(key);
|
||||
}
|
||||
|
||||
FileCacheKey FileCacheKey::fromKeyString(const std::string & key_str)
|
||||
{
|
||||
if (key_str.size() != 32)
|
||||
|
@ -14,16 +14,16 @@ struct FileCacheKey
|
||||
|
||||
FileCacheKey() = default;
|
||||
|
||||
explicit FileCacheKey(const std::string & path);
|
||||
|
||||
explicit FileCacheKey(const UInt128 & key_);
|
||||
|
||||
static FileCacheKey random();
|
||||
static FileCacheKey fromPath(const std::string & path);
|
||||
static FileCacheKey fromKey(const UInt128 & key);
|
||||
static FileCacheKey fromKeyString(const std::string & key_str);
|
||||
|
||||
bool operator==(const FileCacheKey & other) const { return key == other.key; }
|
||||
bool operator<(const FileCacheKey & other) const { return key < other.key; }
|
||||
|
||||
static FileCacheKey fromKeyString(const std::string & key_str);
|
||||
private:
|
||||
explicit FileCacheKey(const UInt128 & key_);
|
||||
};
|
||||
|
||||
using FileCacheKeyAndOffset = std::pair<FileCacheKey, size_t>;
|
||||
|
@ -15,10 +15,12 @@ static constexpr size_t FILECACHE_BYPASS_THRESHOLD = 256 * 1024 * 1024;
|
||||
static constexpr double FILECACHE_DEFAULT_FREE_SPACE_SIZE_RATIO = 0; /// Disabled.
|
||||
static constexpr double FILECACHE_DEFAULT_FREE_SPACE_ELEMENTS_RATIO = 0; /// Disabled.
|
||||
static constexpr int FILECACHE_DEFAULT_FREE_SPACE_REMOVE_BATCH = 10;
|
||||
static constexpr auto FILECACHE_DEFAULT_CONFIG_PATH = "filesystem_caches";
|
||||
|
||||
class FileCache;
|
||||
using FileCachePtr = std::shared_ptr<FileCache>;
|
||||
|
||||
struct FileCacheSettings;
|
||||
struct FileCacheKey;
|
||||
|
||||
}
|
||||
|
@ -155,7 +155,7 @@ namespace Setting
|
||||
extern const SettingsBool use_query_cache;
|
||||
extern const SettingsBool wait_for_async_insert;
|
||||
extern const SettingsSeconds wait_for_async_insert_timeout;
|
||||
extern const SettingsBool enable_secure_identifiers;
|
||||
extern const SettingsBool enforce_strict_identifier_format;
|
||||
}
|
||||
|
||||
namespace ErrorCodes
|
||||
@ -999,12 +999,12 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
|
||||
InterpreterSetQuery::applySettingsFromQuery(ast, context);
|
||||
validateAnalyzerSettings(ast, settings[Setting::allow_experimental_analyzer]);
|
||||
|
||||
if (settings[Setting::enable_secure_identifiers])
|
||||
if (settings[Setting::enforce_strict_identifier_format])
|
||||
{
|
||||
WriteBufferFromOwnString buf;
|
||||
IAST::FormatSettings enable_secure_identifiers_settings(buf, true);
|
||||
enable_secure_identifiers_settings.enable_secure_identifiers = true;
|
||||
ast->format(enable_secure_identifiers_settings);
|
||||
IAST::FormatSettings enforce_strict_identifier_format_settings(buf, true);
|
||||
enforce_strict_identifier_format_settings.enforce_strict_identifier_format = true;
|
||||
ast->format(enforce_strict_identifier_format_settings);
|
||||
}
|
||||
|
||||
if (auto * insert_query = ast->as<ASTInsertQuery>())
|
||||
|
@ -372,7 +372,7 @@ TEST_F(FileCacheTest, LRUPolicy)
|
||||
std::cerr << "Step 1\n";
|
||||
auto cache = DB::FileCache("1", settings);
|
||||
cache.initialize();
|
||||
auto key = DB::FileCache::createKeyForPath("key1");
|
||||
auto key = DB::FileCacheKey::fromPath("key1");
|
||||
|
||||
auto get_or_set = [&](size_t offset, size_t size)
|
||||
{
|
||||
@ -736,7 +736,7 @@ TEST_F(FileCacheTest, LRUPolicy)
|
||||
|
||||
auto cache2 = DB::FileCache("2", settings);
|
||||
cache2.initialize();
|
||||
auto key = DB::FileCache::createKeyForPath("key1");
|
||||
auto key = DB::FileCacheKey::fromPath("key1");
|
||||
|
||||
/// Get [2, 29]
|
||||
assertEqual(
|
||||
@ -755,7 +755,7 @@ TEST_F(FileCacheTest, LRUPolicy)
|
||||
fs::create_directories(settings2.base_path);
|
||||
auto cache2 = DB::FileCache("3", settings2);
|
||||
cache2.initialize();
|
||||
auto key = DB::FileCache::createKeyForPath("key1");
|
||||
auto key = DB::FileCacheKey::fromPath("key1");
|
||||
|
||||
/// Get [0, 24]
|
||||
assertEqual(
|
||||
@ -770,7 +770,7 @@ TEST_F(FileCacheTest, LRUPolicy)
|
||||
|
||||
auto cache = FileCache("4", settings);
|
||||
cache.initialize();
|
||||
const auto key = FileCache::createKeyForPath("key10");
|
||||
const auto key = FileCacheKey::fromPath("key10");
|
||||
const auto key_path = cache.getKeyPath(key, user);
|
||||
|
||||
cache.removeAllReleasable(user.user_id);
|
||||
@ -794,7 +794,7 @@ TEST_F(FileCacheTest, LRUPolicy)
|
||||
|
||||
auto cache = DB::FileCache("5", settings);
|
||||
cache.initialize();
|
||||
const auto key = FileCache::createKeyForPath("key10");
|
||||
const auto key = FileCacheKey::fromPath("key10");
|
||||
const auto key_path = cache.getKeyPath(key, user);
|
||||
|
||||
cache.removeAllReleasable(user.user_id);
|
||||
@ -833,7 +833,7 @@ TEST_F(FileCacheTest, writeBuffer)
|
||||
segment_settings.kind = FileSegmentKind::Ephemeral;
|
||||
segment_settings.unbounded = true;
|
||||
|
||||
auto cache_key = FileCache::createKeyForPath(key);
|
||||
auto cache_key = FileCacheKey::fromPath(key);
|
||||
auto holder = cache.set(cache_key, 0, 3, segment_settings, user);
|
||||
/// The same is done in TemporaryDataOnDisk::createStreamToCacheFile.
|
||||
std::filesystem::create_directories(cache.getKeyPath(cache_key, user));
|
||||
@ -961,7 +961,7 @@ TEST_F(FileCacheTest, temporaryData)
|
||||
const auto user = FileCache::getCommonUser();
|
||||
auto tmp_data_scope = std::make_shared<TemporaryDataOnDiskScope>(nullptr, &file_cache, TemporaryDataOnDiskSettings{});
|
||||
|
||||
auto some_data_holder = file_cache.getOrSet(FileCache::createKeyForPath("some_data"), 0, 5_KiB, 5_KiB, CreateFileSegmentSettings{}, 0, user);
|
||||
auto some_data_holder = file_cache.getOrSet(FileCacheKey::fromPath("some_data"), 0, 5_KiB, 5_KiB, CreateFileSegmentSettings{}, 0, user);
|
||||
|
||||
{
|
||||
ASSERT_EQ(some_data_holder->size(), 5);
|
||||
@ -1103,7 +1103,7 @@ TEST_F(FileCacheTest, CachedReadBuffer)
|
||||
auto cache = std::make_shared<DB::FileCache>("8", settings);
|
||||
cache->initialize();
|
||||
|
||||
auto key = cache->createKeyForPath(file_path);
|
||||
auto key = DB::FileCacheKey::fromPath(file_path);
|
||||
const auto user = FileCache::getCommonUser();
|
||||
|
||||
{
|
||||
@ -1219,7 +1219,7 @@ TEST_F(FileCacheTest, SLRUPolicy)
|
||||
{
|
||||
auto cache = DB::FileCache(std::to_string(++file_cache_name), settings);
|
||||
cache.initialize();
|
||||
auto key = FileCache::createKeyForPath("key1");
|
||||
auto key = FileCacheKey::fromPath("key1");
|
||||
|
||||
auto add_range = [&](size_t offset, size_t size)
|
||||
{
|
||||
@ -1342,7 +1342,7 @@ TEST_F(FileCacheTest, SLRUPolicy)
|
||||
|
||||
std::string data1(15, '*');
|
||||
auto file1 = write_file("test1", data1);
|
||||
auto key1 = cache->createKeyForPath(file1);
|
||||
auto key1 = DB::FileCacheKey::fromPath(file1);
|
||||
|
||||
read_and_check(file1, key1, data1);
|
||||
|
||||
@ -1358,7 +1358,7 @@ TEST_F(FileCacheTest, SLRUPolicy)
|
||||
|
||||
std::string data2(10, '*');
|
||||
auto file2 = write_file("test2", data2);
|
||||
auto key2 = cache->createKeyForPath(file2);
|
||||
auto key2 = DB::FileCacheKey::fromPath(file2);
|
||||
|
||||
read_and_check(file2, key2, data2);
|
||||
|
||||
|
@ -8,6 +8,7 @@
|
||||
#include <Poco/String.h>
|
||||
#include <Common/SensitiveDataMasker.h>
|
||||
#include <Common/SipHash.h>
|
||||
#include <Common/StringUtils.h>
|
||||
#include <algorithm>
|
||||
|
||||
namespace DB
|
||||
@ -265,14 +266,14 @@ void IAST::FormatSettings::writeIdentifier(const String & name, bool ambiguous)
|
||||
|
||||
void IAST::FormatSettings::checkIdentifier(const String & name) const
|
||||
{
|
||||
if (enable_secure_identifiers)
|
||||
if (enforce_strict_identifier_format)
|
||||
{
|
||||
bool is_secure_identifier = std::all_of(name.begin(), name.end(), [](char ch) { return std::isalnum(ch) || ch == '_'; });
|
||||
if (!is_secure_identifier)
|
||||
bool is_word_char_identifier = std::all_of(name.begin(), name.end(), isWordCharASCII);
|
||||
if (!is_word_char_identifier)
|
||||
{
|
||||
throw Exception(
|
||||
ErrorCodes::BAD_ARGUMENTS,
|
||||
"Not a secure identifier: `{}`, a secure identifier must contain only underscore and alphanumeric characters",
|
||||
"Identifier '{}' contains characters other than alphanumeric and cannot be when enforce_strict_identifier_format is enabled",
|
||||
name);
|
||||
}
|
||||
}
|
||||
|
@ -202,7 +202,7 @@ public:
|
||||
char nl_or_ws; /// Newline or whitespace.
|
||||
LiteralEscapingStyle literal_escaping_style;
|
||||
bool print_pretty_type_names;
|
||||
bool enable_secure_identifiers;
|
||||
bool enforce_strict_identifier_format;
|
||||
|
||||
explicit FormatSettings(
|
||||
WriteBuffer & ostr_,
|
||||
@ -213,7 +213,7 @@ public:
|
||||
bool show_secrets_ = true,
|
||||
LiteralEscapingStyle literal_escaping_style_ = LiteralEscapingStyle::Regular,
|
||||
bool print_pretty_type_names_ = false,
|
||||
bool enable_secure_identifiers_ = false)
|
||||
bool enforce_strict_identifier_format_ = false)
|
||||
: ostr(ostr_)
|
||||
, one_line(one_line_)
|
||||
, hilite(hilite_)
|
||||
@ -223,7 +223,7 @@ public:
|
||||
, nl_or_ws(one_line ? ' ' : '\n')
|
||||
, literal_escaping_style(literal_escaping_style_)
|
||||
, print_pretty_type_names(print_pretty_type_names_)
|
||||
, enable_secure_identifiers(enable_secure_identifiers_)
|
||||
, enforce_strict_identifier_format(enforce_strict_identifier_format_)
|
||||
{
|
||||
}
|
||||
|
||||
@ -237,7 +237,7 @@ public:
|
||||
, nl_or_ws(other.nl_or_ws)
|
||||
, literal_escaping_style(other.literal_escaping_style)
|
||||
, print_pretty_type_names(other.print_pretty_type_names)
|
||||
, enable_secure_identifiers(other.enable_secure_identifiers)
|
||||
, enforce_strict_identifier_format(other.enforce_strict_identifier_format)
|
||||
{
|
||||
}
|
||||
|
||||
|
@ -365,11 +365,11 @@ public:
|
||||
|
||||
/// Set limits for current storage.
|
||||
/// Different limits may be applied to different storages, we need to keep it per processor.
|
||||
/// This method is need to be override only for sources.
|
||||
/// This method needs to be overridden only for sources.
|
||||
virtual void setStorageLimits(const std::shared_ptr<const StorageLimitsList> & /*storage_limits*/) {}
|
||||
|
||||
/// This method is called for every processor without input ports.
|
||||
/// Processor can return a new progress for the last read operation.
|
||||
/// Processor can return new progress for the last read operation.
|
||||
/// You should zero internal counters in the call, in order to make in idempotent.
|
||||
virtual std::optional<ReadProgress> getReadProgress() { return std::nullopt; }
|
||||
|
||||
|
@ -1495,11 +1495,10 @@ void WindowTransform::work()
|
||||
}
|
||||
}
|
||||
|
||||
struct WindowFunctionRank final : public WindowFunction
|
||||
struct WindowFunctionRank final : public StatelessWindowFunction
|
||||
{
|
||||
WindowFunctionRank(const std::string & name_,
|
||||
const DataTypes & argument_types_, const Array & parameters_)
|
||||
: WindowFunction(name_, argument_types_, parameters_, std::make_shared<DataTypeUInt64>())
|
||||
WindowFunctionRank(const std::string & name_, const DataTypes & argument_types_, const Array & parameters_)
|
||||
: StatelessWindowFunction(name_, argument_types_, parameters_, std::make_shared<DataTypeUInt64>())
|
||||
{}
|
||||
|
||||
bool allocatesMemoryInArena() const override { return false; }
|
||||
@ -1514,11 +1513,10 @@ struct WindowFunctionRank final : public WindowFunction
|
||||
}
|
||||
};
|
||||
|
||||
struct WindowFunctionDenseRank final : public WindowFunction
|
||||
struct WindowFunctionDenseRank final : public StatelessWindowFunction
|
||||
{
|
||||
WindowFunctionDenseRank(const std::string & name_,
|
||||
const DataTypes & argument_types_, const Array & parameters_)
|
||||
: WindowFunction(name_, argument_types_, parameters_, std::make_shared<DataTypeUInt64>())
|
||||
WindowFunctionDenseRank(const std::string & name_, const DataTypes & argument_types_, const Array & parameters_)
|
||||
: StatelessWindowFunction(name_, argument_types_, parameters_, std::make_shared<DataTypeUInt64>())
|
||||
{}
|
||||
|
||||
bool allocatesMemoryInArena() const override { return false; }
|
||||
@ -1716,7 +1714,7 @@ struct WindowFunctionExponentialTimeDecayedSum final : public StatefulWindowFunc
|
||||
const Float64 decay_length;
|
||||
};
|
||||
|
||||
struct WindowFunctionExponentialTimeDecayedMax final : public WindowFunction
|
||||
struct WindowFunctionExponentialTimeDecayedMax final : public StatelessWindowFunction
|
||||
{
|
||||
static constexpr size_t ARGUMENT_VALUE = 0;
|
||||
static constexpr size_t ARGUMENT_TIME = 1;
|
||||
@ -1731,9 +1729,8 @@ struct WindowFunctionExponentialTimeDecayedMax final : public WindowFunction
|
||||
return applyVisitor(FieldVisitorConvertToNumber<Float64>(), parameters_[0]);
|
||||
}
|
||||
|
||||
WindowFunctionExponentialTimeDecayedMax(const std::string & name_,
|
||||
const DataTypes & argument_types_, const Array & parameters_)
|
||||
: WindowFunction(name_, argument_types_, parameters_, std::make_shared<DataTypeFloat64>())
|
||||
WindowFunctionExponentialTimeDecayedMax(const std::string & name_, const DataTypes & argument_types_, const Array & parameters_)
|
||||
: StatelessWindowFunction(name_, argument_types_, parameters_, std::make_shared<DataTypeFloat64>())
|
||||
, decay_length(getDecayLength(parameters_, name_))
|
||||
{
|
||||
if (argument_types.size() != 2)
|
||||
@ -1991,11 +1988,10 @@ struct WindowFunctionExponentialTimeDecayedAvg final : public StatefulWindowFunc
|
||||
const Float64 decay_length;
|
||||
};
|
||||
|
||||
struct WindowFunctionRowNumber final : public WindowFunction
|
||||
struct WindowFunctionRowNumber final : public StatelessWindowFunction
|
||||
{
|
||||
WindowFunctionRowNumber(const std::string & name_,
|
||||
const DataTypes & argument_types_, const Array & parameters_)
|
||||
: WindowFunction(name_, argument_types_, parameters_, std::make_shared<DataTypeUInt64>())
|
||||
WindowFunctionRowNumber(const std::string & name_, const DataTypes & argument_types_, const Array & parameters_)
|
||||
: StatelessWindowFunction(name_, argument_types_, parameters_, std::make_shared<DataTypeUInt64>())
|
||||
{}
|
||||
|
||||
bool allocatesMemoryInArena() const override { return false; }
|
||||
@ -2273,13 +2269,12 @@ public:
|
||||
|
||||
// ClickHouse-specific variant of lag/lead that respects the window frame.
|
||||
template <bool is_lead>
|
||||
struct WindowFunctionLagLeadInFrame final : public WindowFunction
|
||||
struct WindowFunctionLagLeadInFrame final : public StatelessWindowFunction
|
||||
{
|
||||
FunctionBasePtr func_cast = nullptr;
|
||||
|
||||
WindowFunctionLagLeadInFrame(const std::string & name_,
|
||||
const DataTypes & argument_types_, const Array & parameters_)
|
||||
: WindowFunction(name_, argument_types_, parameters_, createResultType(argument_types_, name_))
|
||||
WindowFunctionLagLeadInFrame(const std::string & name_, const DataTypes & argument_types_, const Array & parameters_)
|
||||
: StatelessWindowFunction(name_, argument_types_, parameters_, createResultType(argument_types_, name_))
|
||||
{
|
||||
if (!parameters.empty())
|
||||
{
|
||||
@ -2427,11 +2422,10 @@ struct WindowFunctionLagLeadInFrame final : public WindowFunction
|
||||
}
|
||||
};
|
||||
|
||||
struct WindowFunctionNthValue final : public WindowFunction
|
||||
struct WindowFunctionNthValue final : public StatelessWindowFunction
|
||||
{
|
||||
WindowFunctionNthValue(const std::string & name_,
|
||||
const DataTypes & argument_types_, const Array & parameters_)
|
||||
: WindowFunction(name_, argument_types_, parameters_, createResultType(name_, argument_types_))
|
||||
WindowFunctionNthValue(const std::string & name_, const DataTypes & argument_types_, const Array & parameters_)
|
||||
: StatelessWindowFunction(name_, argument_types_, parameters_, createResultType(name_, argument_types_))
|
||||
{
|
||||
if (!parameters.empty())
|
||||
{
|
||||
|
@ -14,6 +14,7 @@
|
||||
#include <IO/ReadBufferFromFileBase.h>
|
||||
#include <IO/ReadHelpers.h>
|
||||
#include <Storages/ObjectStorage/DataLakes/Common.h>
|
||||
#include <Storages/ObjectStorage/StorageObjectStorageSource.h>
|
||||
|
||||
#include <Processors/Formats/Impl/ArrowBufferedStreams.h>
|
||||
#include <Processors/Formats/Impl/ParquetBlockInputFormat.h>
|
||||
@ -185,7 +186,8 @@ struct DeltaLakeMetadataImpl
|
||||
std::set<String> & result)
|
||||
{
|
||||
auto read_settings = context->getReadSettings();
|
||||
auto buf = object_storage->readObject(StoredObject(metadata_file_path), read_settings);
|
||||
StorageObjectStorageSource::ObjectInfo object_info(metadata_file_path);
|
||||
auto buf = StorageObjectStorageSource::createReadBuffer(object_info, object_storage, context, log);
|
||||
|
||||
char c;
|
||||
while (!buf->eof())
|
||||
@ -492,7 +494,8 @@ struct DeltaLakeMetadataImpl
|
||||
|
||||
String json_str;
|
||||
auto read_settings = context->getReadSettings();
|
||||
auto buf = object_storage->readObject(StoredObject(last_checkpoint_file), read_settings);
|
||||
StorageObjectStorageSource::ObjectInfo object_info(last_checkpoint_file);
|
||||
auto buf = StorageObjectStorageSource::createReadBuffer(object_info, object_storage, context, log);
|
||||
readJSONObjectPossiblyInvalid(json_str, *buf);
|
||||
|
||||
const JSON json(json_str);
|
||||
@ -557,7 +560,8 @@ struct DeltaLakeMetadataImpl
|
||||
LOG_TRACE(log, "Using checkpoint file: {}", checkpoint_path.string());
|
||||
|
||||
auto read_settings = context->getReadSettings();
|
||||
auto buf = object_storage->readObject(StoredObject(checkpoint_path), read_settings);
|
||||
StorageObjectStorageSource::ObjectInfo object_info(checkpoint_path);
|
||||
auto buf = StorageObjectStorageSource::createReadBuffer(object_info, object_storage, context, log);
|
||||
auto format_settings = getFormatSettings(context);
|
||||
|
||||
/// Force nullable, because this parquet file for some reason does not have nullable
|
||||
|
@ -26,6 +26,7 @@
|
||||
#include <Processors/Formats/Impl/AvroRowInputFormat.h>
|
||||
#include <Storages/ObjectStorage/DataLakes/IcebergMetadata.h>
|
||||
#include <Storages/ObjectStorage/DataLakes/Common.h>
|
||||
#include <Storages/ObjectStorage/StorageObjectStorageSource.h>
|
||||
|
||||
#include <Poco/JSON/Array.h>
|
||||
#include <Poco/JSON/Object.h>
|
||||
@ -387,9 +388,13 @@ DataLakeMetadataPtr IcebergMetadata::create(
|
||||
ContextPtr local_context)
|
||||
{
|
||||
const auto [metadata_version, metadata_file_path] = getMetadataFileAndVersion(object_storage, *configuration);
|
||||
LOG_DEBUG(getLogger("IcebergMetadata"), "Parse metadata {}", metadata_file_path);
|
||||
auto read_settings = local_context->getReadSettings();
|
||||
auto buf = object_storage->readObject(StoredObject(metadata_file_path), read_settings);
|
||||
|
||||
auto log = getLogger("IcebergMetadata");
|
||||
LOG_DEBUG(log, "Parse metadata {}", metadata_file_path);
|
||||
|
||||
StorageObjectStorageSource::ObjectInfo object_info(metadata_file_path);
|
||||
auto buf = StorageObjectStorageSource::createReadBuffer(object_info, object_storage, local_context, log);
|
||||
|
||||
String json_str;
|
||||
readJSONObjectPossiblyInvalid(json_str, *buf);
|
||||
|
||||
@ -456,8 +461,8 @@ Strings IcebergMetadata::getDataFiles() const
|
||||
LOG_TEST(log, "Collect manifest files from manifest list {}", manifest_list_file);
|
||||
|
||||
auto context = getContext();
|
||||
auto read_settings = context->getReadSettings();
|
||||
auto manifest_list_buf = object_storage->readObject(StoredObject(manifest_list_file), read_settings);
|
||||
StorageObjectStorageSource::ObjectInfo object_info(manifest_list_file);
|
||||
auto manifest_list_buf = StorageObjectStorageSource::createReadBuffer(object_info, object_storage, context, log);
|
||||
auto manifest_list_file_reader = std::make_unique<avro::DataFileReaderBase>(std::make_unique<AvroInputStreamReadBufferAdapter>(*manifest_list_buf));
|
||||
|
||||
auto data_type = AvroSchemaReader::avroNodeToDataType(manifest_list_file_reader->dataSchema().root()->leafAt(0));
|
||||
@ -487,7 +492,8 @@ Strings IcebergMetadata::getDataFiles() const
|
||||
{
|
||||
LOG_TEST(log, "Process manifest file {}", manifest_file);
|
||||
|
||||
auto buffer = object_storage->readObject(StoredObject(manifest_file), read_settings);
|
||||
StorageObjectStorageSource::ObjectInfo manifest_object_info(manifest_file);
|
||||
auto buffer = StorageObjectStorageSource::createReadBuffer(manifest_object_info, object_storage, context, log);
|
||||
auto manifest_file_reader = std::make_unique<avro::DataFileReaderBase>(std::make_unique<AvroInputStreamReadBufferAdapter>(*buffer));
|
||||
|
||||
/// Manifest file should always have table schema in avro file metadata. By now we don't support tables with evolved schema,
|
||||
|
@ -150,7 +150,7 @@ std::unique_ptr<ReadBuffer> ReadBufferIterator::recreateLastReadBuffer()
|
||||
auto context = getContext();
|
||||
|
||||
const auto & path = current_object_info->isArchive() ? current_object_info->getPathToArchive() : current_object_info->getPath();
|
||||
auto impl = object_storage->readObject(StoredObject(path), context->getReadSettings());
|
||||
auto impl = StorageObjectStorageSource::createReadBuffer(*current_object_info, object_storage, context, getLogger("ReadBufferIterator"));
|
||||
|
||||
const auto compression_method = chooseCompressionMethod(current_object_info->getFileName(), configuration->compression_method);
|
||||
const auto zstd_window = static_cast<int>(context->getSettingsRef()[Setting::zstd_window_log_max]);
|
||||
@ -276,11 +276,7 @@ ReadBufferIterator::Data ReadBufferIterator::next()
|
||||
else
|
||||
{
|
||||
compression_method = chooseCompressionMethod(filename, configuration->compression_method);
|
||||
read_buf = object_storage->readObject(
|
||||
StoredObject(current_object_info->getPath()),
|
||||
getContext()->getReadSettings(),
|
||||
{},
|
||||
current_object_info->metadata->size_bytes);
|
||||
read_buf = StorageObjectStorageSource::createReadBuffer(*current_object_info, object_storage, getContext(), getLogger("ReadBufferIterator"));
|
||||
}
|
||||
|
||||
if (!query_settings.skip_empty_files || !read_buf->eof())
|
||||
|
@ -7,6 +7,9 @@
|
||||
#include <Processors/Executors/PullingPipelineExecutor.h>
|
||||
#include <Processors/Transforms/ExtractColumnsTransform.h>
|
||||
#include <IO/ReadBufferFromFileBase.h>
|
||||
#include <Interpreters/Cache/FileCacheFactory.h>
|
||||
#include <Interpreters/Cache/FileCache.h>
|
||||
#include <Disks/IO/CachedOnDiskReadBufferFromFile.h>
|
||||
#include <IO/Archives/createArchiveReader.h>
|
||||
#include <Formats/FormatFactory.h>
|
||||
#include <Disks/IO/AsynchronousBoundedReadBuffer.h>
|
||||
@ -37,6 +40,7 @@ namespace Setting
|
||||
extern const SettingsUInt64 max_download_buffer_size;
|
||||
extern const SettingsMaxThreads max_threads;
|
||||
extern const SettingsBool use_cache_for_count_from_files;
|
||||
extern const SettingsString filesystem_cache_name;
|
||||
}
|
||||
|
||||
namespace ErrorCodes
|
||||
@ -420,44 +424,110 @@ std::future<StorageObjectStorageSource::ReaderHolder> StorageObjectStorageSource
|
||||
return create_reader_scheduler([=, this] { return createReader(); }, Priority{});
|
||||
}
|
||||
|
||||
std::unique_ptr<ReadBuffer> StorageObjectStorageSource::createReadBuffer(
|
||||
const ObjectInfo & object_info, const ObjectStoragePtr & object_storage, const ContextPtr & context_, const LoggerPtr & log)
|
||||
std::unique_ptr<ReadBufferFromFileBase> StorageObjectStorageSource::createReadBuffer(
|
||||
ObjectInfo & object_info, const ObjectStoragePtr & object_storage, const ContextPtr & context_, const LoggerPtr & log)
|
||||
{
|
||||
const auto & settings = context_->getSettingsRef();
|
||||
const auto & read_settings = context_->getReadSettings();
|
||||
|
||||
const auto filesystem_cache_name = settings[Setting::filesystem_cache_name].value;
|
||||
bool use_cache = read_settings.enable_filesystem_cache
|
||||
&& !filesystem_cache_name.empty()
|
||||
&& (object_storage->getType() == ObjectStorageType::Azure
|
||||
|| object_storage->getType() == ObjectStorageType::S3);
|
||||
|
||||
if (!object_info.metadata)
|
||||
{
|
||||
if (!use_cache)
|
||||
{
|
||||
return object_storage->readObject(StoredObject(object_info.getPath()), read_settings);
|
||||
}
|
||||
object_info.metadata = object_storage->getObjectMetadata(object_info.getPath());
|
||||
}
|
||||
|
||||
const auto & object_size = object_info.metadata->size_bytes;
|
||||
|
||||
auto read_settings = context_->getReadSettings().adjustBufferSize(object_size);
|
||||
auto modified_read_settings = read_settings.adjustBufferSize(object_size);
|
||||
/// FIXME: Changing this setting to default value breaks something around parquet reading
|
||||
read_settings.remote_read_min_bytes_for_seek = read_settings.remote_fs_buffer_size;
|
||||
modified_read_settings.remote_read_min_bytes_for_seek = modified_read_settings.remote_fs_buffer_size;
|
||||
/// User's object may change, don't cache it.
|
||||
read_settings.enable_filesystem_cache = false;
|
||||
read_settings.use_page_cache_for_disks_without_file_cache = false;
|
||||
|
||||
const bool object_too_small = object_size <= 2 * context_->getSettingsRef()[Setting::max_download_buffer_size];
|
||||
const bool use_prefetch = object_too_small
|
||||
&& read_settings.remote_fs_method == RemoteFSReadMethod::threadpool
|
||||
&& read_settings.remote_fs_prefetch;
|
||||
|
||||
if (use_prefetch)
|
||||
read_settings.remote_read_buffer_use_external_buffer = true;
|
||||
|
||||
auto impl = object_storage->readObject(StoredObject(object_info.getPath(), "", object_size), read_settings);
|
||||
modified_read_settings.use_page_cache_for_disks_without_file_cache = false;
|
||||
|
||||
// Create a read buffer that will prefetch the first ~1 MB of the file.
|
||||
// When reading lots of tiny files, this prefetching almost doubles the throughput.
|
||||
// For bigger files, parallel reading is more useful.
|
||||
if (!use_prefetch)
|
||||
const bool object_too_small = object_size <= 2 * context_->getSettingsRef()[Setting::max_download_buffer_size];
|
||||
const bool use_prefetch = object_too_small
|
||||
&& modified_read_settings.remote_fs_method == RemoteFSReadMethod::threadpool
|
||||
&& modified_read_settings.remote_fs_prefetch;
|
||||
|
||||
/// FIXME: Use async buffer if use_cache,
|
||||
/// because CachedOnDiskReadBufferFromFile does not work as an independent buffer currently.
|
||||
const bool use_async_buffer = use_prefetch || use_cache;
|
||||
|
||||
if (use_async_buffer)
|
||||
modified_read_settings.remote_read_buffer_use_external_buffer = true;
|
||||
|
||||
std::unique_ptr<ReadBufferFromFileBase> impl;
|
||||
if (use_cache)
|
||||
{
|
||||
if (object_info.metadata->etag.empty())
|
||||
{
|
||||
LOG_WARNING(log, "Cannot use filesystem cache, no etag specified");
|
||||
}
|
||||
else
|
||||
{
|
||||
SipHash hash;
|
||||
hash.update(object_info.getPath());
|
||||
hash.update(object_info.metadata->etag);
|
||||
|
||||
const auto cache_key = FileCacheKey::fromKey(hash.get128());
|
||||
auto cache = FileCacheFactory::instance().get(filesystem_cache_name);
|
||||
|
||||
auto read_buffer_creator = [path = object_info.getPath(), object_size, modified_read_settings, object_storage]()
|
||||
{
|
||||
return object_storage->readObject(StoredObject(path, "", object_size), modified_read_settings);
|
||||
};
|
||||
|
||||
impl = std::make_unique<CachedOnDiskReadBufferFromFile>(
|
||||
object_info.getPath(),
|
||||
cache_key,
|
||||
cache,
|
||||
FileCache::getCommonUser(),
|
||||
read_buffer_creator,
|
||||
modified_read_settings,
|
||||
std::string(CurrentThread::getQueryId()),
|
||||
object_size,
|
||||
/* allow_seeks */true,
|
||||
/* use_external_buffer */true,
|
||||
/* read_until_position */std::nullopt,
|
||||
context_->getFilesystemCacheLog());
|
||||
|
||||
LOG_TEST(log, "Using filesystem cache `{}` (path: {}, etag: {}, hash: {})",
|
||||
filesystem_cache_name, object_info.getPath(),
|
||||
object_info.metadata->etag, toString(hash.get128()));
|
||||
}
|
||||
}
|
||||
|
||||
if (!impl)
|
||||
impl = object_storage->readObject(StoredObject(object_info.getPath(), "", object_size), modified_read_settings);
|
||||
|
||||
if (!use_async_buffer)
|
||||
return impl;
|
||||
|
||||
LOG_TRACE(log, "Downloading object of size {} with initial prefetch", object_size);
|
||||
|
||||
auto & reader = context_->getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER);
|
||||
impl = std::make_unique<AsynchronousBoundedReadBuffer>(
|
||||
std::move(impl), reader, read_settings,
|
||||
std::move(impl), reader, modified_read_settings,
|
||||
context_->getAsyncReadCounters(),
|
||||
context_->getFilesystemReadPrefetchesLog());
|
||||
|
||||
impl->setReadUntilEnd();
|
||||
impl->prefetch(DEFAULT_PREFETCH_PRIORITY);
|
||||
if (use_prefetch)
|
||||
{
|
||||
impl->setReadUntilEnd();
|
||||
impl->prefetch(DEFAULT_PREFETCH_PRIORITY);
|
||||
}
|
||||
return impl;
|
||||
}
|
||||
|
||||
@ -787,8 +857,7 @@ StorageObjectStorageSource::ArchiveIterator::createArchiveReader(ObjectInfoPtr o
|
||||
/* path_to_archive */object_info->getPath(),
|
||||
/* archive_read_function */[=, this]()
|
||||
{
|
||||
StoredObject stored_object(object_info->getPath(), "", size);
|
||||
return object_storage->readObject(stored_object, getContext()->getReadSettings());
|
||||
return StorageObjectStorageSource::createReadBuffer(*object_info, object_storage, getContext(), logger);
|
||||
},
|
||||
/* archive_size */size);
|
||||
}
|
||||
|
@ -66,6 +66,11 @@ public:
|
||||
const ObjectInfo & object_info,
|
||||
bool include_connection_info = true);
|
||||
|
||||
static std::unique_ptr<ReadBufferFromFileBase> createReadBuffer(
|
||||
ObjectInfo & object_info,
|
||||
const ObjectStoragePtr & object_storage,
|
||||
const ContextPtr & context_,
|
||||
const LoggerPtr & log);
|
||||
protected:
|
||||
const String name;
|
||||
ObjectStoragePtr object_storage;
|
||||
@ -135,11 +140,6 @@ protected:
|
||||
ReaderHolder createReader();
|
||||
|
||||
std::future<ReaderHolder> createReaderAsync();
|
||||
static std::unique_ptr<ReadBuffer> createReadBuffer(
|
||||
const ObjectInfo & object_info,
|
||||
const ObjectStoragePtr & object_storage,
|
||||
const ContextPtr & context_,
|
||||
const LoggerPtr & log);
|
||||
|
||||
void addNumRowsToCache(const ObjectInfo & object_info, size_t num_rows);
|
||||
void lazyInitialize();
|
||||
|
@ -401,7 +401,7 @@ Chunk SystemRemoteDataPathsSource::generate()
|
||||
|
||||
if (cache)
|
||||
{
|
||||
auto cache_paths = cache->tryGetCachePaths(cache->createKeyForPath(object.remote_path));
|
||||
auto cache_paths = cache->tryGetCachePaths(FileCacheKey::fromPath(object.remote_path));
|
||||
col_cache_paths->insert(Array(cache_paths.begin(), cache_paths.end()));
|
||||
}
|
||||
else
|
||||
|
@ -292,6 +292,7 @@ def clickhouse_execute_http(
|
||||
"http_receive_timeout": timeout,
|
||||
"http_send_timeout": timeout,
|
||||
"output_format_parallel_formatting": 0,
|
||||
"max_rows_to_read": 0, # Some queries read from system.text_log which might get too big
|
||||
}
|
||||
if settings is not None:
|
||||
params.update(settings)
|
||||
|
@ -1412,7 +1412,7 @@ def test_parallel_read(cluster):
|
||||
|
||||
res = azure_query(
|
||||
node,
|
||||
f"select count() from azureBlobStorage('{connection_string}', 'cont', 'test_parallel_read.parquet')",
|
||||
f"select count() from azureBlobStorage('{connection_string}', 'cont', 'test_parallel_read.parquet') settings remote_filesystem_read_method='read'",
|
||||
)
|
||||
assert int(res) == 10000
|
||||
assert_logs_contain_with_retry(node, "AzureBlobStorage readBigAt read bytes")
|
||||
|
@ -0,0 +1,8 @@
|
||||
<clickhouse>
|
||||
<filesystem_caches>
|
||||
<cache1>
|
||||
<max_size>1Gi</max_size>
|
||||
<path>cache1</path>
|
||||
</cache1>
|
||||
</filesystem_caches>
|
||||
</clickhouse>
|
@ -5,6 +5,7 @@ import os
|
||||
import random
|
||||
import string
|
||||
import time
|
||||
import uuid
|
||||
from datetime import datetime
|
||||
|
||||
import delta
|
||||
@ -70,7 +71,10 @@ def started_cluster():
|
||||
cluster = ClickHouseCluster(__file__, with_spark=True)
|
||||
cluster.add_instance(
|
||||
"node1",
|
||||
main_configs=["configs/config.d/named_collections.xml"],
|
||||
main_configs=[
|
||||
"configs/config.d/named_collections.xml",
|
||||
"configs/config.d/filesystem_caches.xml",
|
||||
],
|
||||
user_configs=["configs/users.d/users.xml"],
|
||||
with_minio=True,
|
||||
stay_alive=True,
|
||||
@ -826,3 +830,64 @@ def test_complex_types(started_cluster):
|
||||
f"SELECT metadata FROM deltaLake('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/root/{table_name}' , 'minio', 'minio123')"
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("storage_type", ["s3"])
|
||||
def test_filesystem_cache(started_cluster, storage_type):
|
||||
instance = started_cluster.instances["node1"]
|
||||
spark = started_cluster.spark_session
|
||||
minio_client = started_cluster.minio_client
|
||||
TABLE_NAME = randomize_table_name("test_filesystem_cache")
|
||||
bucket = started_cluster.minio_bucket
|
||||
|
||||
if not minio_client.bucket_exists(bucket):
|
||||
minio_client.make_bucket(bucket)
|
||||
|
||||
parquet_data_path = create_initial_data_file(
|
||||
started_cluster,
|
||||
instance,
|
||||
"SELECT number, toString(number) FROM numbers(100)",
|
||||
TABLE_NAME,
|
||||
)
|
||||
|
||||
write_delta_from_file(spark, parquet_data_path, f"/{TABLE_NAME}")
|
||||
upload_directory(minio_client, bucket, f"/{TABLE_NAME}", "")
|
||||
create_delta_table(instance, TABLE_NAME, bucket=bucket)
|
||||
|
||||
query_id = f"{TABLE_NAME}-{uuid.uuid4()}"
|
||||
instance.query(
|
||||
f"SELECT * FROM {TABLE_NAME} SETTINGS filesystem_cache_name = 'cache1'",
|
||||
query_id=query_id,
|
||||
)
|
||||
|
||||
instance.query("SYSTEM FLUSH LOGS")
|
||||
|
||||
count = int(
|
||||
instance.query(
|
||||
f"SELECT ProfileEvents['CachedReadBufferCacheWriteBytes'] FROM system.query_log WHERE query_id = '{query_id}' AND type = 'QueryFinish'"
|
||||
)
|
||||
)
|
||||
assert 0 < int(
|
||||
instance.query(
|
||||
f"SELECT ProfileEvents['S3GetObject'] FROM system.query_log WHERE query_id = '{query_id}' AND type = 'QueryFinish'"
|
||||
)
|
||||
)
|
||||
|
||||
query_id = f"{TABLE_NAME}-{uuid.uuid4()}"
|
||||
instance.query(
|
||||
f"SELECT * FROM {TABLE_NAME} SETTINGS filesystem_cache_name = 'cache1'",
|
||||
query_id=query_id,
|
||||
)
|
||||
|
||||
instance.query("SYSTEM FLUSH LOGS")
|
||||
|
||||
assert count == int(
|
||||
instance.query(
|
||||
f"SELECT ProfileEvents['CachedReadBufferReadFromCacheBytes'] FROM system.query_log WHERE query_id = '{query_id}' AND type = 'QueryFinish'"
|
||||
)
|
||||
)
|
||||
assert 0 == int(
|
||||
instance.query(
|
||||
f"SELECT ProfileEvents['S3GetObject'] FROM system.query_log WHERE query_id = '{query_id}' AND type = 'QueryFinish'"
|
||||
)
|
||||
)
|
||||
|
@ -0,0 +1,8 @@
|
||||
<clickhouse>
|
||||
<filesystem_caches>
|
||||
<cache1>
|
||||
<max_size>1Gi</max_size>
|
||||
<path>cache1</path>
|
||||
</cache1>
|
||||
</filesystem_caches>
|
||||
</clickhouse>
|
@ -72,7 +72,10 @@ def started_cluster():
|
||||
with_hdfs = False
|
||||
cluster.add_instance(
|
||||
"node1",
|
||||
main_configs=["configs/config.d/named_collections.xml"],
|
||||
main_configs=[
|
||||
"configs/config.d/named_collections.xml",
|
||||
"configs/config.d/filesystem_caches.xml",
|
||||
],
|
||||
user_configs=["configs/users.d/users.xml"],
|
||||
with_minio=True,
|
||||
with_azurite=True,
|
||||
@ -870,3 +873,66 @@ def test_restart_broken_s3(started_cluster):
|
||||
)
|
||||
|
||||
assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 100
|
||||
|
||||
|
||||
@pytest.mark.parametrize("storage_type", ["s3"])
|
||||
def test_filesystem_cache(started_cluster, storage_type):
|
||||
instance = started_cluster.instances["node1"]
|
||||
spark = started_cluster.spark_session
|
||||
TABLE_NAME = "test_filesystem_cache_" + storage_type + "_" + get_uuid_str()
|
||||
|
||||
write_iceberg_from_df(
|
||||
spark,
|
||||
generate_data(spark, 0, 10),
|
||||
TABLE_NAME,
|
||||
mode="overwrite",
|
||||
format_version="1",
|
||||
partition_by="a",
|
||||
)
|
||||
|
||||
default_upload_directory(
|
||||
started_cluster,
|
||||
storage_type,
|
||||
f"/iceberg_data/default/{TABLE_NAME}/",
|
||||
f"/iceberg_data/default/{TABLE_NAME}/",
|
||||
)
|
||||
|
||||
create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster)
|
||||
|
||||
query_id = f"{TABLE_NAME}-{uuid.uuid4()}"
|
||||
instance.query(
|
||||
f"SELECT * FROM {TABLE_NAME} SETTINGS filesystem_cache_name = 'cache1'",
|
||||
query_id=query_id,
|
||||
)
|
||||
|
||||
instance.query("SYSTEM FLUSH LOGS")
|
||||
|
||||
count = int(
|
||||
instance.query(
|
||||
f"SELECT ProfileEvents['CachedReadBufferCacheWriteBytes'] FROM system.query_log WHERE query_id = '{query_id}' AND type = 'QueryFinish'"
|
||||
)
|
||||
)
|
||||
assert 0 < int(
|
||||
instance.query(
|
||||
f"SELECT ProfileEvents['S3GetObject'] FROM system.query_log WHERE query_id = '{query_id}' AND type = 'QueryFinish'"
|
||||
)
|
||||
)
|
||||
|
||||
query_id = f"{TABLE_NAME}-{uuid.uuid4()}"
|
||||
instance.query(
|
||||
f"SELECT * FROM {TABLE_NAME} SETTINGS filesystem_cache_name = 'cache1'",
|
||||
query_id=query_id,
|
||||
)
|
||||
|
||||
instance.query("SYSTEM FLUSH LOGS")
|
||||
|
||||
assert count == int(
|
||||
instance.query(
|
||||
f"SELECT ProfileEvents['CachedReadBufferReadFromCacheBytes'] FROM system.query_log WHERE query_id = '{query_id}' AND type = 'QueryFinish'"
|
||||
)
|
||||
)
|
||||
assert 0 == int(
|
||||
instance.query(
|
||||
f"SELECT ProfileEvents['S3GetObject'] FROM system.query_log WHERE query_id = '{query_id}' AND type = 'QueryFinish'"
|
||||
)
|
||||
)
|
||||
|
@ -0,0 +1,8 @@
|
||||
<clickhouse>
|
||||
<filesystem_caches>
|
||||
<cache1>
|
||||
<max_size>1Gi</max_size>
|
||||
<path>cache1</path>
|
||||
</cache1>
|
||||
</filesystem_caches>
|
||||
</clickhouse>
|
@ -56,6 +56,7 @@ def started_cluster():
|
||||
"configs/named_collections.xml",
|
||||
"configs/schema_cache.xml",
|
||||
"configs/blob_log.xml",
|
||||
"configs/filesystem_caches.xml",
|
||||
],
|
||||
user_configs=[
|
||||
"configs/access.xml",
|
||||
@ -2394,3 +2395,61 @@ def test_respect_object_existence_on_partitioned_write(started_cluster):
|
||||
)
|
||||
|
||||
assert int(result) == 44
|
||||
|
||||
|
||||
def test_filesystem_cache(started_cluster):
|
||||
id = uuid.uuid4()
|
||||
bucket = started_cluster.minio_bucket
|
||||
instance = started_cluster.instances["dummy"]
|
||||
table_name = f"test_filesystem_cache-{uuid.uuid4()}"
|
||||
|
||||
instance.query(
|
||||
f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{table_name}.tsv', auto, 'x UInt64') select number from numbers(100) SETTINGS s3_truncate_on_insert=1"
|
||||
)
|
||||
|
||||
query_id = f"{table_name}-{uuid.uuid4()}"
|
||||
instance.query(
|
||||
f"select * from s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{table_name}.tsv') SETTINGS filesystem_cache_name = 'cache1', enable_filesystem_cache=1",
|
||||
query_id=query_id,
|
||||
)
|
||||
|
||||
instance.query("SYSTEM FLUSH LOGS")
|
||||
|
||||
count = int(
|
||||
instance.query(
|
||||
f"SELECT ProfileEvents['CachedReadBufferCacheWriteBytes'] FROM system.query_log WHERE query_id = '{query_id}' AND type = 'QueryFinish'"
|
||||
)
|
||||
)
|
||||
|
||||
assert count == 290
|
||||
assert 0 < int(
|
||||
instance.query(
|
||||
f"SELECT ProfileEvents['S3GetObject'] FROM system.query_log WHERE query_id = '{query_id}' AND type = 'QueryFinish'"
|
||||
)
|
||||
)
|
||||
|
||||
instance.query("SYSTEM DROP SCHEMA CACHE")
|
||||
|
||||
query_id = f"{table_name}-{uuid.uuid4()}"
|
||||
instance.query(
|
||||
f"select * from s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{table_name}.tsv') SETTINGS filesystem_cache_name = 'cache1', enable_filesystem_cache=1",
|
||||
query_id=query_id,
|
||||
)
|
||||
|
||||
instance.query("SYSTEM FLUSH LOGS")
|
||||
|
||||
assert count * 2 == int(
|
||||
instance.query(
|
||||
f"SELECT ProfileEvents['CachedReadBufferReadFromCacheBytes'] FROM system.query_log WHERE query_id = '{query_id}' AND type = 'QueryFinish'"
|
||||
)
|
||||
)
|
||||
assert 0 == int(
|
||||
instance.query(
|
||||
f"SELECT ProfileEvents['CachedReadBufferCacheWriteBytes'] FROM system.query_log WHERE query_id = '{query_id}' AND type = 'QueryFinish'"
|
||||
)
|
||||
)
|
||||
assert 0 == int(
|
||||
instance.query(
|
||||
f"SELECT ProfileEvents['S3GetObject'] FROM system.query_log WHERE query_id = '{query_id}' AND type = 'QueryFinish'"
|
||||
)
|
||||
)
|
||||
|
@ -9,7 +9,7 @@ system flush logs;
|
||||
drop table if exists logs;
|
||||
create view logs as select * from system.text_log where now() - toIntervalMinute(120) < event_time;
|
||||
|
||||
SET max_rows_to_read = 0;
|
||||
SET max_rows_to_read = 0; -- system.text_log can be really big
|
||||
|
||||
-- Check that we don't have too many messages formatted with fmt::runtime or strings concatenation.
|
||||
-- 0.001 threshold should be always enough, the value was about 0.00025
|
||||
|
@ -144,8 +144,7 @@ echo 9
|
||||
$CLICKHOUSE_CLIENT \
|
||||
--server_logs_file=/dev/null \
|
||||
--query="SELECT if( count() > 0, 'text_log non empty', 'text_log empty') FROM system.text_log WHERE event_date >= yesterday() and message like '%find_me%';
|
||||
select * from system.text_log where event_date >= yesterday() and message like '%TOPSECRET=TOPSECRET%';" --ignore-error
|
||||
|
||||
select * from system.text_log where event_date >= yesterday() and message like '%TOPSECRET=TOPSECRET%' SETTINGS max_rows_to_read = 0" --ignore-error
|
||||
echo 'finish'
|
||||
rm -f "$tmp_file" >/dev/null 2>&1
|
||||
rm -f "$tmp_file2" >/dev/null 2>&1
|
||||
|
@ -6,12 +6,12 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
|
||||
${CLICKHOUSE_CLIENT} --query="SELECT 6103"
|
||||
|
||||
for (( i=1; i <= 50; i++ ))
|
||||
for (( i=1; i <= 50; i++ ))
|
||||
do
|
||||
|
||||
${CLICKHOUSE_CLIENT} --query="SYSTEM FLUSH LOGS"
|
||||
sleep 0.1;
|
||||
if [[ $($CLICKHOUSE_CURL -sS "$CLICKHOUSE_URL" -d "SELECT count() > 0 FROM system.text_log WHERE position(system.text_log.message, 'SELECT 6103') > 0 AND event_date >= yesterday()") == 1 ]]; then echo 1; exit; fi;
|
||||
if [[ $($CLICKHOUSE_CURL -sS "$CLICKHOUSE_URL" -d "SELECT count() > 0 FROM system.text_log WHERE position(system.text_log.message, 'SELECT 6103') > 0 AND event_date >= yesterday() SETTINGS max_rows_to_read = 0") == 1 ]]; then echo 1; exit; fi;
|
||||
|
||||
done;
|
||||
|
||||
|
@ -1,5 +1,7 @@
|
||||
-- Tags: zookeeper
|
||||
|
||||
SET max_rows_to_read = 0; -- system.text_log can be really big
|
||||
|
||||
create table rmt1 (d DateTime, n int) engine=ReplicatedMergeTree('/test/01165/{database}/rmt', '1') order by n partition by toYYYYMMDD(d);
|
||||
create table rmt2 (d DateTime, n int) engine=ReplicatedMergeTree('/test/01165/{database}/rmt', '2') order by n partition by toYYYYMMDD(d);
|
||||
|
||||
|
@ -2,5 +2,6 @@ SeLeCt 'ab
|
||||
cd' /* hello */ -- world
|
||||
, 1;
|
||||
|
||||
SET max_rows_to_read = 0; -- system.text_log can be really big
|
||||
SYSTEM FLUSH LOGS;
|
||||
SELECT extract(message, 'SeL.+?;') FROM system.text_log WHERE event_date >= yesterday() AND message LIKE '%SeLeCt \'ab\n%' ORDER BY event_time DESC LIMIT 1 FORMAT TSVRaw;
|
||||
|
@ -30,7 +30,7 @@ $CLICKHOUSE_CLIENT --insert_keeper_fault_injection_probability=0 --max_block_siz
|
||||
# Now wait for cleanup thread
|
||||
for _ in {1..60}; do
|
||||
$CLICKHOUSE_CLIENT --query "SYSTEM FLUSH LOGS"
|
||||
[[ $($CLICKHOUSE_CLIENT --query "SELECT sum(toUInt32(extract(message, 'Removed (\d+) old log entries'))) FROM system.text_log WHERE event_date >= yesterday() AND logger_name LIKE '%' || '$CLICKHOUSE_DATABASE' || '%r1%(ReplicatedMergeTreeCleanupThread)%' AND message LIKE '%Removed % old log entries%'") -gt $((SCALE - 10)) ]] && break;
|
||||
[[ $($CLICKHOUSE_CLIENT --query "SELECT sum(toUInt32(extract(message, 'Removed (\d+) old log entries'))) FROM system.text_log WHERE event_date >= yesterday() AND logger_name LIKE '%' || '$CLICKHOUSE_DATABASE' || '%r1%(ReplicatedMergeTreeCleanupThread)%' AND message LIKE '%Removed % old log entries%' SETTINGS max_rows_to_read = 0") -gt $((SCALE - 10)) ]] && break;
|
||||
sleep 1
|
||||
done
|
||||
|
||||
|
@ -72,6 +72,6 @@ ${CLICKHOUSE_CLIENT} --query_id "$query_id" --query "select i from simple where
|
||||
# We have to search the server's error log because the following warning message
|
||||
# is generated during pipeline destruction and thus is not sent to the client.
|
||||
${CLICKHOUSE_CLIENT} --query "system flush logs"
|
||||
if [[ $(${CLICKHOUSE_CLIENT} --query "select count() > 0 from system.text_log where query_id = '$query_id' and level = 'Warning' and message like '%We have query_id removed but it\'s not recorded. This is a bug%' format TSVRaw") == 1 ]]; then echo "We have query_id removed but it's not recorded. This is a bug." >&2; exit 1; fi
|
||||
if [[ $(${CLICKHOUSE_CLIENT} --query "select count() > 0 from system.text_log where query_id = '$query_id' and level = 'Warning' and message like '%We have query_id removed but it\'s not recorded. This is a bug%' format TSVRaw SETTINGS max_rows_to_read = 0") == 1 ]]; then echo "We have query_id removed but it's not recorded. This is a bug." >&2; exit 1; fi
|
||||
|
||||
${CLICKHOUSE_CLIENT} --query "drop table simple"
|
||||
|
@ -35,7 +35,7 @@ $CLICKHOUSE_CLIENT -m -q "
|
||||
-- OPTIMIZE TABLE x FINAL will be done in background
|
||||
-- attach to it's log, via table UUID in query_id (see merger/mutator code).
|
||||
create materialized view this_text_log engine=Memory() as
|
||||
select * from system.text_log where query_id like '%${ttl_02262_uuid}%';
|
||||
select * from system.text_log where query_id like '%${ttl_02262_uuid}%' SETTINGS max_rows_to_read = 0;
|
||||
|
||||
optimize table ttl_02262 final;
|
||||
system flush logs;
|
||||
|
@ -64,5 +64,6 @@ drop table rmt;
|
||||
drop table rmt2;
|
||||
|
||||
system flush logs;
|
||||
SET max_rows_to_read = 0; -- system.text_log can be really big
|
||||
select count() > 0 from system.text_log where yesterday() <= event_date and logger_name like '%' || currentDatabase() || '%' and message like '%Removing % parts from filesystem (concurrently): Parts:%';
|
||||
select count() > 1, countDistinct(thread_id) > 1 from system.text_log where yesterday() <= event_date and logger_name like '%' || currentDatabase() || '%' and message like '%Removing % parts in blocks range%';
|
||||
|
@ -99,4 +99,4 @@ $CLICKHOUSE_CLIENT -q 'system flush logs'
|
||||
$CLICKHOUSE_CLIENT -q "select count() > 0 from system.text_log where event_date >= yesterday() and query_id like '$TEST_MARK%' and (
|
||||
message_format_string in ('Unexpected end of file while reading chunk header of HTTP chunked data', 'Unexpected EOF, got {} of {} bytes',
|
||||
'Query was cancelled or a client has unexpectedly dropped the connection') or
|
||||
message like '%Connection reset by peer%' or message like '%Broken pipe, while writing to socket%')"
|
||||
message like '%Connection reset by peer%' or message like '%Broken pipe, while writing to socket%') SETTINGS max_rows_to_read = 0"
|
||||
|
@ -15,6 +15,7 @@ alter table t materialize projection p_norm settings mutations_sync = 1;
|
||||
|
||||
SYSTEM FLUSH LOGS;
|
||||
|
||||
SET max_rows_to_read = 0; -- system.text_log can be really big
|
||||
SELECT * FROM system.text_log WHERE event_time >= now() - 30 and level == 'Error' and message like '%BAD_DATA_PART_NAME%'and message like '%p_norm%';
|
||||
|
||||
DROP TABLE IF EXISTS t;
|
||||
|
@ -12,5 +12,5 @@ do
|
||||
query_id=$(echo "select queryID() from (select sum(s), k from remote('127.0.0.{1,2}', view(select sum(number) s, bitAnd(number, 3) k from numbers_mt(1000000) group by k)) group by k) limit 1 settings group_by_two_level_threshold=1, max_threads=3, prefer_localhost_replica=1" | ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}" --data-binary @- 2>&1)
|
||||
|
||||
${CLICKHOUSE_CLIENT} --query="system flush logs"
|
||||
${CLICKHOUSE_CLIENT} --query="select count() from system.text_log where event_date >= yesterday() and query_id = '${query_id}' and message like '%Converting aggregation data to two-level%'" | grep -P '^6$' && break;
|
||||
${CLICKHOUSE_CLIENT} --query="select count() from system.text_log where event_date >= yesterday() and query_id = '${query_id}' and message like '%Converting aggregation data to two-level%' SETTINGS max_rows_to_read = 0" | grep -P '^6$' && break;
|
||||
done
|
||||
|
@ -48,6 +48,7 @@ $CLICKHOUSE_CLIENT --query "
|
||||
SELECT 'id_' || splitByChar('_', query_id)[1] AS id FROM system.text_log
|
||||
WHERE query_id LIKE '%$query_id_suffix' AND message LIKE '%$message%'
|
||||
ORDER BY id
|
||||
SETTINGS max_rows_to_read = 0
|
||||
"
|
||||
|
||||
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS t_async_insert_fallback"
|
||||
|
@ -30,6 +30,7 @@ SELECT count() FROM 02581_trips SETTINGS select_sequential_consistency = 1;
|
||||
DELETE FROM 02581_trips WHERE id IN (SELECT (number*10 + 9)::UInt32 FROM numbers(10000000)) SETTINGS lightweight_deletes_sync = 2;
|
||||
SELECT count(), _part from 02581_trips WHERE description = '' GROUP BY _part ORDER BY _part SETTINGS select_sequential_consistency=1;
|
||||
|
||||
SET max_rows_to_read = 0; -- system.text_log can be really big
|
||||
SYSTEM FLUSH LOGS;
|
||||
-- Check that in every mutation there were parts that built sets (log messages like 'Created Set with 10000000 entries from 10000000 rows in 0.388989187 sec.' )
|
||||
-- and parts that shared sets (log messages like 'Got set from cache in 0.388930505 sec.' )
|
||||
|
@ -58,6 +58,7 @@ WHERE
|
||||
SETTINGS mutations_sync=2;
|
||||
SELECT count() from 02581_trips WHERE description = '';
|
||||
|
||||
SET max_rows_to_read = 0; -- system.text_log can be really big
|
||||
SYSTEM FLUSH LOGS;
|
||||
-- Check that in every mutation there were parts that built sets (log messages like 'Created Set with 10000000 entries from 10000000 rows in 0.388989187 sec.' )
|
||||
-- and parts that shared sets (log messages like 'Got set from cache in 0.388930505 sec.' )
|
||||
|
@ -30,5 +30,5 @@ ORDER BY column;
|
||||
DROP TABLE t_sparse_columns_clear SYNC;
|
||||
|
||||
SYSTEM FLUSH LOGS;
|
||||
|
||||
SET max_rows_to_read = 0; -- system.text_log can be really big
|
||||
SELECT count(), groupArray(message) FROM system.text_log WHERE logger_name LIKE '%' || currentDatabase() || '.t_sparse_columns_clear' || '%' AND level = 'Error';
|
||||
|
@ -1,5 +1,6 @@
|
||||
-- Tags: no-parallel
|
||||
|
||||
SET max_rows_to_read = 0; -- system.text_log can be really big
|
||||
SELECT 'Hello', throwIf(1); -- { serverError FUNCTION_THROW_IF_VALUE_IS_NON_ZERO }
|
||||
SYSTEM FLUSH LOGS;
|
||||
|
||||
|
@ -1,2 +1,3 @@
|
||||
SYSTEM FLUSH LOGS;
|
||||
SET max_rows_to_read = 0; -- system.text_log can be really big
|
||||
SELECT count() > 0 FROM system.text_log WHERE event_date >= yesterday() AND message LIKE '%Starting ClickHouse%';
|
||||
|
@ -8,6 +8,7 @@ SELECT count() FROM clusterAllReplicas('test_cluster_two_shard_three_replicas_lo
|
||||
SYSTEM FLUSH LOGS;
|
||||
|
||||
SET enable_parallel_replicas=0;
|
||||
SET max_rows_to_read = 0; -- system.text_log can be really big
|
||||
SELECT count() > 0 FROM system.text_log
|
||||
WHERE query_id in (select query_id from system.query_log where current_database = currentDatabase() AND log_comment = '02875_190aed82-2423-413b-ad4c-24dcca50f65b')
|
||||
AND message LIKE '%Parallel reading from replicas is disabled for cluster%';
|
||||
|
@ -8,6 +8,7 @@ SELECT count() FROM remote('127.0.0.{1..6}', currentDatabase(), tt) settings log
|
||||
SYSTEM FLUSH LOGS;
|
||||
|
||||
SET enable_parallel_replicas=0;
|
||||
SET max_rows_to_read = 0; -- system.text_log can be really big
|
||||
SELECT count() > 0 FROM system.text_log
|
||||
WHERE query_id in (select query_id from system.query_log where current_database = currentDatabase() AND log_comment = '02875_89f3c39b-1919-48cb-b66e-ef9904e73146')
|
||||
AND message LIKE '%Parallel reading from replicas is disabled for cluster%';
|
||||
|
@ -34,6 +34,7 @@ SELECT count() FROM checksums_r3;
|
||||
|
||||
SYSTEM FLUSH LOGS;
|
||||
|
||||
SET max_rows_to_read = 0; -- system.text_log can be really big
|
||||
SELECT * FROM system.text_log WHERE event_time >= now() - INTERVAL 120 SECOND and level == 'Error' and message like '%CHECKSUM_DOESNT_MATCH%' and logger_name like ('%' || currentDatabase() || '%checksums_r%');
|
||||
|
||||
DROP TABLE IF EXISTS checksums_r3;
|
||||
|
@ -21,6 +21,7 @@ SELECT count(), min(k), max(k), avg(k) FROM t1 SETTINGS log_comment='02898_defau
|
||||
|
||||
-- check logs
|
||||
SYSTEM FLUSH LOGS;
|
||||
SET max_rows_to_read = 0; -- system.text_log can be really big
|
||||
SELECT count() > 0 FROM system.text_log
|
||||
WHERE query_id in (select query_id from system.query_log where current_database = currentDatabase() AND log_comment='02898_default_190aed82-2423-413b-ad4c-24dcca50f65b')
|
||||
AND message LIKE '%Total rows to read: 3000%' SETTINGS enable_parallel_replicas=0;
|
||||
|
@ -56,7 +56,8 @@ for _ in {0..50}; do
|
||||
(
|
||||
(logger_name = 'MergeTreeBackgroundExecutor' and message like '%{$table_uuid::$part_name}%No active replica has part $part_name or covering part%') or
|
||||
(logger_name like '$table_uuid::$part_name (MergeFromLogEntryTask)' and message like '%No active replica has part $part_name or covering part%')
|
||||
);
|
||||
)
|
||||
SETTINGS max_rows_to_read = 0;
|
||||
")
|
||||
if [[ $no_active_repilica_messages -gt 0 ]]; then
|
||||
break
|
||||
@ -78,5 +79,6 @@ $CLICKHOUSE_CLIENT -m -q "
|
||||
(logger_name = 'MergeTreeBackgroundExecutor' and message like '%{$table_uuid::$part_name}%No active replica has part $part_name or covering part%') or
|
||||
(logger_name like '$table_uuid::$part_name (MergeFromLogEntryTask)' and message like '%No active replica has part $part_name or covering part%')
|
||||
)
|
||||
group by level;
|
||||
group by level
|
||||
SETTINGS max_rows_to_read = 0;
|
||||
"
|
||||
|
@ -11,7 +11,7 @@ SET cluster_for_parallel_replicas='parallel_replicas';
|
||||
SELECT count() FROM test_parallel_replicas_settings WHERE NOT ignore(*) settings log_comment='0_f621c4f2-4da7-4a7c-bb6d-052c442d0f7f';
|
||||
|
||||
SYSTEM FLUSH LOGS;
|
||||
|
||||
SET max_rows_to_read = 0; -- system.text_log can be really big
|
||||
SELECT count() > 0 FROM system.text_log
|
||||
WHERE yesterday() <= event_date
|
||||
AND query_id in (select query_id from system.query_log where current_database=currentDatabase() AND log_comment='0_f621c4f2-4da7-4a7c-bb6d-052c442d0f7f')
|
||||
|
@ -6,6 +6,7 @@ select conut(); -- { serverError UNKNOWN_FUNCTION }
|
||||
|
||||
system flush logs;
|
||||
|
||||
SET max_rows_to_read = 0; -- system.text_log can be really big
|
||||
select count() > 0 from system.text_log where message_format_string = 'Peak memory usage{}: {}.' and value1 is not null and value2 like '% MiB';
|
||||
|
||||
select count() > 0 from system.text_log where level = 'Error' and message_format_string = 'Unknown {}{} identifier {} in scope {}{}' and value1 = 'expression' and value3 = '`count`' and value4 = 'SELECT count';
|
||||
|
@ -11,6 +11,7 @@ system disable failpoint replicated_sends_failpoint;
|
||||
system sync replica data_r2;
|
||||
|
||||
system flush logs;
|
||||
SET max_rows_to_read = 0; -- system.text_log can be really big
|
||||
select event_time_microseconds, logger_name, message from system.text_log where level = 'Error' and message like '%Malformed chunked encoding%' order by 1 format LineAsString;
|
||||
|
||||
-- { echoOn }
|
||||
|
@ -24,7 +24,7 @@ SELECT count() FROM t_ind_merge_1 WHERE b < 100 SETTINGS force_data_skipping_ind
|
||||
EXPLAIN indexes = 1 SELECT count() FROM t_ind_merge_1 WHERE b < 100;
|
||||
|
||||
SYSTEM FLUSH LOGS;
|
||||
|
||||
SET max_rows_to_read = 0; -- system.text_log can be really big
|
||||
WITH
|
||||
(SELECT uuid FROM system.tables WHERE database = currentDatabase() AND table = 't_ind_merge_1') AS uuid,
|
||||
extractAllGroupsVertical(message, 'containing (\\d+) columns \((\\d+) merged, (\\d+) gathered\)')[1] AS groups
|
||||
|
@ -26,6 +26,7 @@ INSERT INTO t_ind_merge_2 SELECT number, number, rand(), rand(), rand(), rand()
|
||||
|
||||
OPTIMIZE TABLE t_ind_merge_2 FINAL;
|
||||
SYSTEM FLUSH LOGS;
|
||||
SET max_rows_to_read = 0; -- system.text_log can be really big
|
||||
|
||||
--- merged: a, c, d; gathered: b, e, f
|
||||
WITH
|
||||
|
@ -8,7 +8,7 @@ PRIMARY KEY (town, date)
|
||||
PARTITION BY toYear(date)
|
||||
COMMENT 'test' -- to end ENGINE definition, so SETTINGS will be in the query level
|
||||
SETTINGS
|
||||
enable_secure_identifiers=true; -- { serverError BAD_ARGUMENTS }
|
||||
enforce_strict_identifier_format=true; -- { serverError BAD_ARGUMENTS }
|
||||
DROP TABLE IF EXISTS `test_foo_#`;
|
||||
|
||||
|
||||
@ -23,7 +23,7 @@ PRIMARY KEY (town, date)
|
||||
PARTITION BY toYear(date)
|
||||
COMMENT 'test' -- to end ENGINE definition, so SETTINGS will be in the query level
|
||||
SETTINGS
|
||||
enable_secure_identifiers=true; -- { serverError BAD_ARGUMENTS }
|
||||
enforce_strict_identifier_format=true; -- { serverError BAD_ARGUMENTS }
|
||||
|
||||
DROP TABLE IF EXISTS test_foo;
|
||||
CREATE TABLE test_foo (
|
||||
@ -36,7 +36,7 @@ PRIMARY KEY (town, date)
|
||||
PARTITION BY toYear(date)
|
||||
COMMENT 'test' -- to end ENGINE definition, so SETTINGS will be in the query level
|
||||
SETTINGS
|
||||
enable_secure_identifiers=true; -- { serverError BAD_ARGUMENTS }
|
||||
enforce_strict_identifier_format=true; -- { serverError BAD_ARGUMENTS }
|
||||
|
||||
DROP TABLE IF EXISTS test_foo;
|
||||
CREATE TABLE test_foo (
|
||||
@ -49,7 +49,7 @@ PRIMARY KEY (town, date)
|
||||
PARTITION BY toYear(date)
|
||||
COMMENT 'test' -- to end ENGINE definition, so SETTINGS will be in the query level
|
||||
SETTINGS
|
||||
enable_secure_identifiers=true; -- { serverError BAD_ARGUMENTS }
|
||||
enforce_strict_identifier_format=true; -- { serverError BAD_ARGUMENTS }
|
||||
|
||||
DROP TABLE IF EXISTS test_foo;
|
||||
CREATE TABLE test_foo (
|
||||
@ -62,11 +62,11 @@ PRIMARY KEY (town, date)
|
||||
PARTITION BY toYear(date)
|
||||
COMMENT 'test' -- to end ENGINE definition, so SETTINGS will be in the query level
|
||||
SETTINGS
|
||||
enable_secure_identifiers=true;
|
||||
enforce_strict_identifier_format=true;
|
||||
|
||||
SHOW CREATE TABLE test_foo
|
||||
SHOW CREATE TABLE test_foo
|
||||
SETTINGS
|
||||
enable_secure_identifiers=true;
|
||||
enforce_strict_identifier_format=true;
|
||||
|
||||
DROP TABLE IF EXISTS test_foo;
|
||||
CREATE TABLE test_foo (
|
||||
@ -79,13 +79,13 @@ PRIMARY KEY (town, date)
|
||||
PARTITION BY toYear(date)
|
||||
COMMENT 'test' -- to end ENGINE definition, so SETTINGS will be in the query level
|
||||
SETTINGS
|
||||
enable_secure_identifiers=true;
|
||||
enforce_strict_identifier_format=true;
|
||||
|
||||
SHOW CREATE TABLE test_foo
|
||||
SHOW CREATE TABLE test_foo
|
||||
SETTINGS
|
||||
enable_secure_identifiers=true;
|
||||
enforce_strict_identifier_format=true;
|
||||
|
||||
-- CREATE TABLE without `enable_secure_identifiers`
|
||||
-- CREATE TABLE without `enforce_strict_identifier_format`
|
||||
DROP TABLE IF EXISTS test_foo;
|
||||
CREATE TABLE `test_foo` (
|
||||
`insecure_$` Int8,
|
||||
@ -95,17 +95,17 @@ CREATE TABLE `test_foo` (
|
||||
ENGINE = MergeTree
|
||||
PRIMARY KEY (town, date)
|
||||
PARTITION BY toYear(date);
|
||||
-- Then SHOW CREATE .. with `enable_secure_identifiers`
|
||||
-- Then SHOW CREATE .. with `enforce_strict_identifier_format`
|
||||
-- While the result contains insecure identifiers (`insecure_$`), the `SHOW CREATE TABLE ...` query does not have any. So the query is expected to succeed.
|
||||
SHOW CREATE TABLE test_foo
|
||||
SHOW CREATE TABLE test_foo
|
||||
SETTINGS
|
||||
enable_secure_identifiers=true;
|
||||
enforce_strict_identifier_format=true;
|
||||
|
||||
DROP TABLE IF EXISTS test_foo;
|
||||
|
||||
-- SHOW CREATE .. query contains an insecure identifier (`test_foo$`) with `enable_secure_identifiers`
|
||||
-- SHOW CREATE .. query contains an insecure identifier (`test_foo$`) with `enforce_strict_identifier_format`
|
||||
SHOW CREATE TABLE `test_foo$`
|
||||
SETTINGS
|
||||
enable_secure_identifiers=true; -- { serverError BAD_ARGUMENTS }
|
||||
enforce_strict_identifier_format=true; -- { serverError BAD_ARGUMENTS }
|
||||
|
||||
DROP TABLE IF EXISTS test_foo;
|
||||
DROP TABLE IF EXISTS test_foo;
|
||||
|
@ -0,0 +1,3 @@
|
||||
-- https://github.com/ClickHouse/ClickHouse/issues/70569
|
||||
-- Reproduces UBSAN alert about misaligned address
|
||||
SELECT anyLast(id), anyLast(time), exponentialTimeDecayedAvg(10)(id, time) FROM values('id Int8, time DateTime', (1,1),(1,2),(2,3),(3,3),(3,5)); -- { serverError BAD_ARGUMENTS }
|
@ -15,7 +15,8 @@ check_replicas_read_in_order() {
|
||||
SELECT COUNT() > 0
|
||||
FROM system.text_log
|
||||
WHERE query_id IN (SELECT query_id FROM system.query_log WHERE query_id != '$1' AND initial_query_id = '$1' AND event_date >= yesterday())
|
||||
AND event_date >= yesterday() AND message ILIKE '%Reading%ranges in order%'"
|
||||
AND event_date >= yesterday() AND message ILIKE '%Reading%ranges in order%'
|
||||
SETTINGS max_rows_to_read=0"
|
||||
}
|
||||
|
||||
# replicas should use reading in order following initiator's decision to execute aggregation in order.
|
||||
|
Loading…
Reference in New Issue
Block a user