Merge branch 'master' into exception_message_patterns5

This commit is contained in:
Alexander Tokmakov 2023-01-26 00:41:32 +01:00
commit 9b670946db
38 changed files with 1422 additions and 1133 deletions

View File

@ -2,11 +2,11 @@
# NOTE: has nothing common with DBMS_TCP_PROTOCOL_VERSION,
# only DBMS_TCP_PROTOCOL_VERSION should be incremented on protocol changes.
SET(VERSION_REVISION 54470)
SET(VERSION_MAJOR 22)
SET(VERSION_MINOR 13)
SET(VERSION_REVISION 54471)
SET(VERSION_MAJOR 23)
SET(VERSION_MINOR 2)
SET(VERSION_PATCH 1)
SET(VERSION_GITHASH 688e488e930c83eefeac4f87c4cc029cc5b231e3)
SET(VERSION_DESCRIBE v22.13.1.1-testing)
SET(VERSION_STRING 22.13.1.1)
SET(VERSION_GITHASH dcaac47702510cc87ddf266bc524f6b7ce0a8e6e)
SET(VERSION_DESCRIBE v23.2.1.1-testing)
SET(VERSION_STRING 23.2.1.1)
# end of autochange

View File

@ -146,6 +146,12 @@ def prepare_for_hung_check(drop_databases):
"KILL QUERY WHERE query LIKE 'SELECT URL, uniq(SearchPhrase) AS u FROM test.hits GROUP BY URL ORDER BY u %'"
)
)
# Long query from 02136_kill_scalar_queries
call_with_retry(
make_query_command(
"KILL QUERY WHERE query LIKE 'SELECT (SELECT number FROM system.numbers WHERE number = 1000000000000)%'"
)
)
if drop_databases:
for i in range(5):

View File

@ -21,6 +21,13 @@ ENGINE = HDFS(URI, format)
`SELECT` queries, the format must be supported for input, and to perform
`INSERT` queries for output. The available formats are listed in the
[Formats](../../../interfaces/formats.md#formats) section.
- [PARTITION BY expr]
### PARTITION BY
`PARTITION BY` — Optional. In most cases you don't need a partition key, and if it is needed you generally don't need a partition key more granular than by month. Partitioning does not speed up queries (in contrast to the ORDER BY expression). You should never use too granular partitioning. Don't partition your data by client identifiers or names (instead, make client identifier or name the first column in the ORDER BY expression).
For partitioning by month, use the `toYYYYMM(date_column)` expression, where `date_column` is a column with a date of the type [Date](/docs/en/sql-reference/data-types/date.md). The partition names here have the `"YYYYMM"` format.
**Example:**

View File

@ -13,6 +13,7 @@ This engine provides integration with [Amazon S3](https://aws.amazon.com/s3/) ec
``` sql
CREATE TABLE s3_engine_table (name String, value UInt32)
ENGINE = S3(path, [aws_access_key_id, aws_secret_access_key,] format, [compression])
[PARTITION BY expr]
[SETTINGS ...]
```
@ -23,6 +24,12 @@ CREATE TABLE s3_engine_table (name String, value UInt32)
- `aws_access_key_id`, `aws_secret_access_key` - Long-term credentials for the [AWS](https://aws.amazon.com/) account user. You can use these to authenticate your requests. Parameter is optional. If credentials are not specified, they are used from the configuration file. For more information see [Using S3 for Data Storage](../mergetree-family/mergetree.md#table_engine-mergetree-s3).
- `compression` — Compression type. Supported values: `none`, `gzip/gz`, `brotli/br`, `xz/LZMA`, `zstd/zst`. Parameter is optional. By default, it will autodetect compression by file extension.
### PARTITION BY
`PARTITION BY` — Optional. In most cases you don't need a partition key, and if it is needed you generally don't need a partition key more granular than by month. Partitioning does not speed up queries (in contrast to the ORDER BY expression). You should never use too granular partitioning. Don't partition your data by client identifiers or names (instead, make client identifier or name the first column in the ORDER BY expression).
For partitioning by month, use the `toYYYYMM(date_column)` expression, where `date_column` is a column with a date of the type [Date](/docs/en/sql-reference/data-types/date.md). The partition names here have the `"YYYYMM"` format.
**Example**
``` sql

View File

@ -77,7 +77,7 @@ Use the `ORDER BY tuple()` syntax, if you do not need sorting. See [Selecting th
#### PARTITION BY
`PARTITION BY` — The [partitioning key](/docs/en/engines/table-engines/mergetree-family/custom-partitioning-key.md). Optional. In most cases you don't need partition key, and in most other cases you don't need partition key more granular than by months. Partitioning does not speed up queries (in contrast to the ORDER BY expression). You should never use too granular partitioning. Don't partition your data by client identifiers or names (instead make client identifier or name the first column in the ORDER BY expression).
`PARTITION BY` — The [partitioning key](/docs/en/engines/table-engines/mergetree-family/custom-partitioning-key.md). Optional. In most cases, you don't need a partition key, and if you do need to partition, generally you do not need a partition key more granular than by month. Partitioning does not speed up queries (in contrast to the ORDER BY expression). You should never use too granular partitioning. Don't partition your data by client identifiers or names (instead, make client identifier or name the first column in the ORDER BY expression).
For partitioning by month, use the `toYYYYMM(date_column)` expression, where `date_column` is a column with a date of the type [Date](/docs/en/sql-reference/data-types/date.md). The partition names here have the `"YYYYMM"` format.

View File

@ -86,3 +86,9 @@ $ echo -e "1,2\n3,4" | clickhouse-local -q "CREATE TABLE table (a Int64, b Int64
- `SELECT ... SAMPLE`
- Indices
- Replication
## PARTITION BY
`PARTITION BY` — Optional. It is possible to create separate files by partitioning the data on a partition key. In most cases, you don't need a partition key, and if it is needed you generally don't need a partition key more granular than by month. Partitioning does not speed up queries (in contrast to the ORDER BY expression). You should never use too granular partitioning. Don't partition your data by client identifiers or names (instead, make client identifier or name the first column in the ORDER BY expression).
For partitioning by month, use the `toYYYYMM(date_column)` expression, where `date_column` is a column with a date of the type [Date](/docs/en/sql-reference/data-types/date.md). The partition names here have the `"YYYYMM"` format.

View File

@ -96,3 +96,9 @@ SELECT * FROM url_engine_table
- `ALTER` and `SELECT...SAMPLE` operations.
- Indexes.
- Replication.
## PARTITION BY
`PARTITION BY` — Optional. It is possible to create separate files by partitioning the data on a partition key. In most cases, you don't need a partition key, and if it is needed you generally don't need a partition key more granular than by month. Partitioning does not speed up queries (in contrast to the ORDER BY expression). You should never use too granular partitioning. Don't partition your data by client identifiers or names (instead, make client identifier or name the first column in the ORDER BY expression).
For partitioning by month, use the `toYYYYMM(date_column)` expression, where `date_column` is a column with a date of the type [Date](/docs/en/sql-reference/data-types/date.md). The partition names here have the `"YYYYMM"` format.

View File

@ -9,6 +9,7 @@ Columns:
- `metric` ([String](../../sql-reference/data-types/string.md)) — Metric name.
- `value` ([Float64](../../sql-reference/data-types/float.md)) — Metric value.
- `description` ([String](../../sql-reference/data-types/string.md) - Metric description)
**Example**
@ -17,18 +18,18 @@ SELECT * FROM system.asynchronous_metrics LIMIT 10
```
``` text
┌─metric──────────────────────────────────┬──────value─┐
jemalloc.background_thread.run_interval │ 0
jemalloc.background_thread.num_runs │ 0
jemalloc.background_thread.num_threads │ 0
jemalloc.retained │ 422551552
jemalloc.mapped │ 1682989056
jemalloc.resident │ 1656446976
jemalloc.metadata_thp │ 0
jemalloc.metadata │ 10226856
UncompressedCacheCells │ 0
MarkCacheFiles │ 0
└─────────────────────────────────────────┴────────────┘
┌─metric──────────────────────────────────┬──────value─┬─description────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
AsynchronousMetricsCalculationTimeSpent │ 0.00179053 │ Time in seconds spent for calculation of asynchronous metrics (this is the overhead of asynchronous metrics).
NumberOfDetachedByUserParts │ 0 │ The total number of parts detached from MergeTree tables by users with the `ALTER TABLE DETACH` query (as opposed to unexpected, broken or ignored parts). The server does not care about detached parts and they can be removed.
NumberOfDetachedParts │ 0 │ The total number of parts detached from MergeTree tables. A part can be detached by a user with the `ALTER TABLE DETACH` query or by the server itself it the part is broken, unexpected or unneeded. The server does not care about detached parts and they can be removed.
TotalRowsOfMergeTreeTables │ 2781309 │ Total amount of rows (records) stored in all tables of MergeTree family.
TotalBytesOfMergeTreeTables │ 7741926 │ Total amount of bytes (compressed, including data and indices) stored in all tables of MergeTree family.
NumberOfTables │ 93 │ Total number of tables summed across the databases on the server, excluding the databases that cannot contain MergeTree tables. The excluded database engines are those who generate the set of tables on the fly, like `Lazy`, `MySQL`, `PostgreSQL`, `SQlite`.
NumberOfDatabases │ 6 │ Total number of databases on the server.
MaxPartCountForPartition │ 6 │ Maximum number of parts per partition across all partitions of all tables of MergeTree family. Values larger than 300 indicates misconfiguration, overload, or massive data loading.
ReplicasSumMergesInQueue │ 0 │ Sum of merge operations in the queue (still to be applied) across Replicated tables.
ReplicasSumInsertsInQueue │ 0 │ Sum of INSERT operations in the queue (still to be replicated) across Replicated tables.
└─────────────────────────────────────────┴────────────┴────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
```
**See Also**

View File

@ -0,0 +1,74 @@
---
slug: /en/sql-reference/table-functions/mongodb
sidebar_position: 42
sidebar_label: mongodb
---
# mongodb
Allows `SELECT` queries to be performed on data that is stored on a remote MongoDB server.
**Syntax**
``` sql
mongodb(host:port, database, collection, user, password, structure [, options])
```
**Arguments**
- `host:port` — MongoDB server address.
- `database` — Remote database name.
- `collection` — Remote collection name.
- `user` — MongoDB user.
- `password` — User password.
- `structure` - The schema for the ClickHouse table returned from this function.
- `options` - MongoDB connection string options (optional parameter).
**Returned Value**
A table object with the same columns as the original MongoDB table.
**Examples**
Suppose we have a collection named `my_collection` defined in a MongoDB database named `test`, and we insert a couple of documents:
```sql
db.createUser({user:"test_user",pwd:"password",roles:[{role:"readWrite",db:"test"}]})
db.createCollection("my_collection")
db.my_collection.insertOne(
{ log_type: "event", host: "120.5.33.9", command: "check-cpu-usage -w 75 -c 90" }
)
db.my_collection.insertOne(
{ log_type: "event", host: "120.5.33.4", command: "system-check"}
)
```
Let's query the collection using the `mongodb` table function:
```sql
SELECT * FROM mongodb(
'127.0.0.1:27017',
'test',
'my_collection',
'test_user',
'password',
'log_type String, host String, command String',
'connectTimeoutMS=10000'
)
```
**See Also**
- [The `MongoDB` table engine](../../engines/table-engines/integrations/mongodb.md)
- [Using MongoDB as a dictionary source](../../sql-reference/dictionaries/external-dictionaries/external-dicts-dict-sources/#mongodb)

View File

@ -66,6 +66,9 @@ AsynchronousMetrics::AsynchronousMetrics(
openFileIfExists("/proc/uptime", uptime);
openFileIfExists("/proc/net/dev", net_dev);
openFileIfExists("/sys/fs/cgroup/memory/memory.limit_in_bytes", cgroupmem_limit_in_bytes);
openFileIfExists("/sys/fs/cgroup/memory/memory.usage_in_bytes", cgroupmem_usage_in_bytes);
openSensors();
openBlockDevices();
openEDAC();
@ -879,6 +882,35 @@ void AsynchronousMetrics::update(TimePoint update_time)
}
}
if (cgroupmem_limit_in_bytes && cgroupmem_usage_in_bytes)
{
try {
cgroupmem_limit_in_bytes->rewind();
cgroupmem_usage_in_bytes->rewind();
uint64_t cgroup_mem_limit_in_bytes = 0;
uint64_t cgroup_mem_usage_in_bytes = 0;
readText(cgroup_mem_limit_in_bytes, *cgroupmem_limit_in_bytes);
readText(cgroup_mem_usage_in_bytes, *cgroupmem_usage_in_bytes);
if (cgroup_mem_limit_in_bytes && cgroup_mem_usage_in_bytes)
{
new_values["CgroupMemoryTotal"] = { cgroup_mem_limit_in_bytes, "The total amount of memory in cgroup, in bytes." };
new_values["CgroupMemoryUsed"] = { cgroup_mem_usage_in_bytes, "The amount of memory used in cgroup, in bytes." };
}
else
{
LOG_DEBUG(log, "Cannot read statistics about the cgroup memory total and used. Total got '{}', Used got '{}'.",
cgroup_mem_limit_in_bytes, cgroup_mem_usage_in_bytes);
}
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
if (meminfo)
{
try

View File

@ -108,6 +108,9 @@ private:
std::optional<ReadBufferFromFilePRead> uptime;
std::optional<ReadBufferFromFilePRead> net_dev;
std::optional<ReadBufferFromFilePRead> cgroupmem_limit_in_bytes;
std::optional<ReadBufferFromFilePRead> cgroupmem_usage_in_bytes;
std::vector<std::unique_ptr<ReadBufferFromFilePRead>> thermal;
std::unordered_map<String /* device name */,

View File

@ -379,14 +379,18 @@ void DatabaseOnDisk::renameTable(
if (dictionary && table && !table->isDictionary())
throw Exception(ErrorCodes::INCORRECT_QUERY, "Use RENAME/EXCHANGE TABLE (instead of RENAME/EXCHANGE DICTIONARY) for tables");
/// We have to lock the table before detaching, because otherwise lockExclusively will throw. But the table may not exist.
bool need_lock = table != nullptr;
if (need_lock)
table_lock = table->lockExclusively(local_context->getCurrentQueryId(), local_context->getSettingsRef().lock_acquire_timeout);
detachTable(local_context, table_name);
if (!need_lock)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Table was detached without locking, it's a bug");
UUID prev_uuid = UUIDHelpers::Nil;
try
{
table_lock = table->lockExclusively(
local_context->getCurrentQueryId(), local_context->getSettingsRef().lock_acquire_timeout);
table_metadata_path = getObjectMetadataPath(table_name);
attach_query = parseQueryFromMetadata(log, local_context, table_metadata_path);
auto & create = attach_query->as<ASTCreateQuery &>();

View File

@ -236,6 +236,7 @@ StoragePtr DatabaseWithOwnTablesBase::detachTableUnlocked(const String & table_n
backQuote(database_name), backQuote(table_name));
res = it->second;
tables.erase(it);
res->is_detached = true;
auto table_id = res->getStorageID();
if (table_id.hasUUID())
@ -272,6 +273,10 @@ void DatabaseWithOwnTablesBase::attachTableUnlocked(const String & table_name, c
DatabaseCatalog::instance().removeUUIDMapping(table_id.uuid);
throw Exception(ErrorCodes::TABLE_ALREADY_EXISTS, "Table {} already exists.", table_id.getFullTableName());
}
/// It is important to reset is_detached here since in case of RENAME in
/// non-Atomic database the is_detached is set to true before RENAME.
table->is_detached = false;
}
void DatabaseWithOwnTablesBase::shutdown()

View File

@ -448,7 +448,7 @@ void DatabaseMySQL::detachTablePermanently(ContextPtr, const String & table_name
remove_or_detach_tables.erase(table_name);
throw;
}
table_iter->second.second->is_dropped = true;
table_iter->second.second->is_detached = true;
}
void DatabaseMySQL::dropTable(ContextPtr local_context, const String & table_name, bool /*sync*/)

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,13 @@
#include <Dictionaries/RangeHashedDictionary.h>
/// RangeHashedDictionary is instantiated from two files
/// RangeHashedDictionarySimple.cpp and RangeHashedDictionaryComplex.cpp
/// to better parallelize the build procedure and avoid MSan build failure
/// due to excessive resource consumption.
namespace DB
{
template class RangeHashedDictionary<DictionaryKeyType::Complex>;
}

View File

@ -0,0 +1,13 @@
#include <Dictionaries/RangeHashedDictionary.h>
/// RangeHashedDictionary is instantiated from two files
/// RangeHashedDictionarySimple.cpp and RangeHashedDictionaryComplex.cpp
/// to better parallelize the build procedure and avoid MSan build failure
/// due to excessive resource consumption.
namespace DB
{
template class RangeHashedDictionary<DictionaryKeyType::Simple>;
}

View File

@ -0,0 +1,17 @@
#include "FunctionsHashing.h"
#include <Functions/FunctionFactory.h>
/// FunctionsHashing instantiations are separated into files FunctionsHashing*.cpp
/// to better parallelize the build procedure and avoid MSan build failure
/// due to excessive resource consumption.
namespace DB
{
REGISTER_FUNCTION(HashingInt)
{
factory.registerFunction<FunctionIntHash32>();
factory.registerFunction<FunctionIntHash64>();
}
}

View File

@ -2,6 +2,9 @@
#include <Functions/FunctionFactory.h>
/// FunctionsHashing instantiations are separated into files FunctionsHashing*.cpp
/// to better parallelize the build procedure and avoid MSan build failure
/// due to excessive resource consumption.
namespace DB
{
@ -14,18 +17,10 @@ REGISTER_FUNCTION(Hashing)
factory.registerFunction<FunctionFarmFingerprint64>();
factory.registerFunction<FunctionFarmHash64>();
factory.registerFunction<FunctionMetroHash64>();
factory.registerFunction<FunctionIntHash32>();
factory.registerFunction<FunctionIntHash64>();
factory.registerFunction<FunctionURLHash>();
factory.registerFunction<FunctionJavaHash>();
factory.registerFunction<FunctionJavaHashUTF16LE>();
factory.registerFunction<FunctionHiveHash>();
factory.registerFunction<FunctionMurmurHash2_32>();
factory.registerFunction<FunctionMurmurHash2_64>();
factory.registerFunction<FunctionMurmurHash3_32>();
factory.registerFunction<FunctionMurmurHash3_64>();
factory.registerFunction<FunctionMurmurHash3_128>();
factory.registerFunction<FunctionGccMurmurHash>();
factory.registerFunction<FunctionXxHash32>();
factory.registerFunction<FunctionXxHash64>();

View File

@ -0,0 +1,21 @@
#include "FunctionsHashing.h"
#include <Functions/FunctionFactory.h>
/// FunctionsHashing instantiations are separated into files FunctionsHashing*.cpp
/// to better parallelize the build procedure and avoid MSan build failure
/// due to excessive resource consumption.
namespace DB
{
REGISTER_FUNCTION(HashingMurmur)
{
factory.registerFunction<FunctionMurmurHash2_32>();
factory.registerFunction<FunctionMurmurHash2_64>();
factory.registerFunction<FunctionMurmurHash3_32>();
factory.registerFunction<FunctionMurmurHash3_64>();
factory.registerFunction<FunctionMurmurHash3_128>();
factory.registerFunction<FunctionGccMurmurHash>();
}
}

View File

@ -5,8 +5,9 @@
#include "FunctionsHashing.h"
#include <Functions/FunctionFactory.h>
/// SSL functions are located in the separate FunctionsHashingSSL.cpp file
/// to lower the compilation time of FunctionsHashing.cpp
/// FunctionsHashing instantiations are separated into files FunctionsHashing*.cpp
/// to better parallelize the build procedure and avoid MSan build failure
/// due to excessive resource consumption.
namespace DB
{

View File

@ -287,6 +287,10 @@ BlockIO InterpreterDropQuery::executeToTemporaryTable(const String & table_name,
table->drop();
table->is_dropped = true;
}
else if (kind == ASTDropQuery::Kind::Detach)
{
table->is_detached = true;
}
}
}

View File

@ -42,15 +42,17 @@ StorageHDFSCluster::StorageHDFSCluster(
const String & format_name_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
const String & compression_method_)
const String & compression_method_,
bool structure_argument_was_provided_)
: IStorageCluster(table_id_)
, cluster_name(cluster_name_)
, uri(uri_)
, format_name(format_name_)
, compression_method(compression_method_)
, structure_argument_was_provided(structure_argument_was_provided_)
{
context_->getRemoteHostFilter().checkURL(Poco::URI(uri_));
checkHDFSURL(uri_);
context_->getRemoteHostFilter().checkURL(Poco::URI(uri_));
StorageInMemoryMetadata storage_metadata;
@ -58,7 +60,6 @@ StorageHDFSCluster::StorageHDFSCluster(
{
auto columns = StorageHDFS::getTableStructureFromData(format_name, uri_, compression_method, context_);
storage_metadata.setColumns(columns);
add_columns_structure_to_query = true;
}
else
storage_metadata.setColumns(columns_);
@ -91,7 +92,7 @@ Pipe StorageHDFSCluster::read(
const bool add_agg_info = processed_stage == QueryProcessingStage::WithMergeableState;
auto query_to_send = query_info.original_query->clone();
if (add_columns_structure_to_query)
if (!structure_argument_was_provided)
addColumnsStructureToQueryWithClusterEngine(
query_to_send, StorageDictionary::generateNamesAndTypesDescription(storage_snapshot->metadata->getColumns().getAll()), 3, getName());

View File

@ -28,7 +28,8 @@ public:
const String & format_name_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
const String & compression_method_);
const String & compression_method_,
bool structure_argument_was_provided_);
std::string getName() const override { return "HDFSCluster"; }
@ -48,7 +49,7 @@ private:
String uri;
String format_name;
String compression_method;
bool add_columns_structure_to_query = false;
bool structure_argument_was_provided;
};

View File

@ -50,10 +50,10 @@ TableLockHolder IStorage::lockForShare(const String & query_id, const std::chron
{
TableLockHolder result = tryLockTimed(drop_lock, RWLockImpl::Read, query_id, acquire_timeout);
if (is_dropped)
if (is_dropped || is_detached)
{
auto table_id = getStorageID();
throw Exception(ErrorCodes::TABLE_IS_DROPPED, "Table {}.{} is dropped", table_id.database_name, table_id.table_name);
throw Exception(ErrorCodes::TABLE_IS_DROPPED, "Table {}.{} is dropped or detached", table_id.database_name, table_id.table_name);
}
return result;
}
@ -62,7 +62,7 @@ TableLockHolder IStorage::tryLockForShare(const String & query_id, const std::ch
{
TableLockHolder result = tryLockTimed(drop_lock, RWLockImpl::Read, query_id, acquire_timeout);
if (is_dropped)
if (is_dropped || is_detached)
{
// Table was dropped while acquiring the lock
result = nullptr;
@ -81,7 +81,7 @@ IStorage::AlterLockHolder IStorage::lockForAlter(const std::chrono::milliseconds
"Possible deadlock avoided. Client should retry.",
getStorageID().getFullTableName(), acquire_timeout.count());
if (is_dropped)
if (is_dropped || is_detached)
throw Exception(ErrorCodes::TABLE_IS_DROPPED, "Table {} is dropped or detached", getStorageID());
return lock;
@ -93,7 +93,7 @@ TableExclusiveLockHolder IStorage::lockExclusively(const String & query_id, cons
TableExclusiveLockHolder result;
result.drop_lock = tryLockTimed(drop_lock, RWLockImpl::Write, query_id, acquire_timeout);
if (is_dropped)
if (is_dropped || is_detached)
throw Exception(ErrorCodes::TABLE_IS_DROPPED, "Table {} is dropped or detached", getStorageID());
return result;

View File

@ -561,6 +561,7 @@ public:
virtual void onActionLockRemove(StorageActionBlockType /* action_type */) {}
std::atomic<bool> is_dropped{false};
std::atomic<bool> is_detached{false};
/// Does table support index for IN sections
virtual bool supportsIndexForIn() const { return false; }

View File

@ -470,7 +470,6 @@ void StorageLiveView::drop()
DatabaseCatalog::instance().removeViewDependency(select_table_id, table_id);
std::lock_guard lock(mutex);
is_dropped = true;
condition.notify_all();
}

View File

@ -31,10 +31,11 @@ std::vector<String> AsyncBlockIDsCache::getChildren()
{
auto zookeeper = storage.getZooKeeper();
auto watch_callback = [&](const Coordination::WatchResponse &)
auto watch_callback = [last_time = this->last_updatetime.load()
, update_min_interval = this->update_min_interval
, task = task->shared_from_this()](const Coordination::WatchResponse &)
{
auto now = std::chrono::steady_clock::now();
auto last_time = last_updatetime.load();
if (now - last_time < update_min_interval)
{
std::chrono::milliseconds sleep_time = std::chrono::duration_cast<std::chrono::milliseconds>(update_min_interval - (now - last_time));

View File

@ -680,14 +680,15 @@ void DataPartStorageOnDisk::rename(
{
disk.setLastModified(from, Poco::Timestamp::fromEpochTime(time(nullptr)));
disk.moveDirectory(from, to);
/// Only after moveDirectory() since before the directory does not exists.
SyncGuardPtr to_sync_guard;
if (fsync_part_dir)
to_sync_guard = volume->getDisk()->getDirectorySyncGuard(to);
});
part_dir = new_part_dir;
root_path = new_root_path;
SyncGuardPtr sync_guard;
if (fsync_part_dir)
sync_guard = volume->getDisk()->getDirectorySyncGuard(getRelativePath());
}
void DataPartStorageOnDisk::changeRootPath(const std::string & from_root, const std::string & to_root)

View File

@ -828,6 +828,8 @@ void Fetcher::downloadBaseOrProjectionPartToDisk(
size_t files;
readBinary(files, in);
std::vector<std::unique_ptr<WriteBufferFromFileBase>> written_files;
for (size_t i = 0; i < files; ++i)
{
String file_name;
@ -845,8 +847,8 @@ void Fetcher::downloadBaseOrProjectionPartToDisk(
"This may happen if we are trying to download part from malicious replica or logical error.",
absolute_file_path, data_part_storage->getRelativePath());
auto file_out = data_part_storage->writeFile(file_name, std::min<UInt64>(file_size, DBMS_DEFAULT_BUFFER_SIZE), {});
HashingWriteBuffer hashing_out(*file_out);
written_files.emplace_back(data_part_storage->writeFile(file_name, std::min<UInt64>(file_size, DBMS_DEFAULT_BUFFER_SIZE), {}));
HashingWriteBuffer hashing_out(*written_files.back());
copyDataWithThrottler(in, hashing_out, file_size, blocker.getCounter(), throttler);
if (blocker.isCancelled())
@ -870,9 +872,14 @@ void Fetcher::downloadBaseOrProjectionPartToDisk(
file_name != "columns.txt" &&
file_name != IMergeTreeDataPart::DEFAULT_COMPRESSION_CODEC_FILE_NAME)
checksums.addFile(file_name, file_size, expected_hash);
}
/// Call fsync for all files at once in attempt to decrease the latency
for (auto & file : written_files)
{
file->finalize();
if (sync)
hashing_out.sync();
file->sync();
}
}

View File

@ -630,6 +630,8 @@ void finalizeMutatedPart(
ContextPtr context,
bool sync)
{
std::vector<std::unique_ptr<WriteBufferFromFileBase>> written_files;
if (new_data_part->uuid != UUIDHelpers::Nil)
{
auto out = new_data_part->getDataPartStorage().writeFile(IMergeTreeDataPart::UUID_FILE_NAME, 4096, context->getWriteSettings());
@ -637,8 +639,7 @@ void finalizeMutatedPart(
writeUUIDText(new_data_part->uuid, out_hashing);
new_data_part->checksums.files[IMergeTreeDataPart::UUID_FILE_NAME].file_size = out_hashing.count();
new_data_part->checksums.files[IMergeTreeDataPart::UUID_FILE_NAME].file_hash = out_hashing.getHash();
if (sync)
out_hashing.sync();
written_files.push_back(std::move(out));
}
if (execute_ttl_type != ExecuteTTLType::NONE)
@ -649,43 +650,47 @@ void finalizeMutatedPart(
new_data_part->ttl_infos.write(out_hashing);
new_data_part->checksums.files["ttl.txt"].file_size = out_hashing.count();
new_data_part->checksums.files["ttl.txt"].file_hash = out_hashing.getHash();
if (sync)
out_hashing.sync();
written_files.push_back(std::move(out_ttl));
}
if (!new_data_part->getSerializationInfos().empty())
{
auto out = new_data_part->getDataPartStorage().writeFile(IMergeTreeDataPart::SERIALIZATION_FILE_NAME, 4096, context->getWriteSettings());
HashingWriteBuffer out_hashing(*out);
auto out_serialization = new_data_part->getDataPartStorage().writeFile(IMergeTreeDataPart::SERIALIZATION_FILE_NAME, 4096, context->getWriteSettings());
HashingWriteBuffer out_hashing(*out_serialization);
new_data_part->getSerializationInfos().writeJSON(out_hashing);
new_data_part->checksums.files[IMergeTreeDataPart::SERIALIZATION_FILE_NAME].file_size = out_hashing.count();
new_data_part->checksums.files[IMergeTreeDataPart::SERIALIZATION_FILE_NAME].file_hash = out_hashing.getHash();
if (sync)
out_hashing.sync();
written_files.push_back(std::move(out_serialization));
}
{
/// Write file with checksums.
auto out_checksums = new_data_part->getDataPartStorage().writeFile("checksums.txt", 4096, context->getWriteSettings());
new_data_part->checksums.write(*out_checksums);
if (sync)
out_checksums->sync();
} /// close fd
written_files.push_back(std::move(out_checksums));
}
{
auto out = new_data_part->getDataPartStorage().writeFile(IMergeTreeDataPart::DEFAULT_COMPRESSION_CODEC_FILE_NAME, 4096, context->getWriteSettings());
DB::writeText(queryToString(codec->getFullCodecDesc()), *out);
if (sync)
out->sync();
} /// close fd
auto out_comp = new_data_part->getDataPartStorage().writeFile(IMergeTreeDataPart::DEFAULT_COMPRESSION_CODEC_FILE_NAME, 4096, context->getWriteSettings());
DB::writeText(queryToString(codec->getFullCodecDesc()), *out_comp);
written_files.push_back(std::move(out_comp));
}
{
/// Write a file with a description of columns.
auto out_columns = new_data_part->getDataPartStorage().writeFile("columns.txt", 4096, context->getWriteSettings());
new_data_part->getColumns().writeText(*out_columns);
written_files.push_back(std::move(out_columns));
}
for (auto & file : written_files)
{
file->finalize();
if (sync)
out_columns->sync();
} /// close fd
file->sync();
}
/// Close files
written_files.clear();
new_data_part->rows_count = source_part->rows_count;
new_data_part->index_granularity = source_part->index_granularity;

View File

@ -30,6 +30,7 @@ const char * auto_contributors[] {
"Aleksandr Shalimov",
"Aleksandra (Ася)",
"Aleksandrov Vladimir",
"Aleksei Filatov",
"Aleksei Levushkin",
"Aleksei Semiglazov",
"Aleksey",
@ -192,6 +193,7 @@ const char * auto_contributors[] {
"Bill",
"BiteTheDDDDt",
"BlahGeek",
"Bo Lu",
"Bogdan",
"Bogdan Voronin",
"BohuTANG",
@ -256,6 +258,7 @@ const char * auto_contributors[] {
"Denis Krivak",
"Denis Zhuravlev",
"Denny Crane",
"Denys Golotiuk",
"Derek Chia",
"Derek Perkins",
"Diego Nieto (lesandie)",
@ -300,6 +303,7 @@ const char * auto_contributors[] {
"Elizaveta Mironyuk",
"Elykov Alexandr",
"Emmanuel Donin de Rosière",
"Enrique Herreros",
"Eric",
"Eric Daniel",
"Erixonich",
@ -476,6 +480,7 @@ const char * auto_contributors[] {
"Kirill Shvakov",
"Koblikov Mihail",
"KochetovNicolai",
"Konstantin Bogdanov",
"Konstantin Grabar",
"Konstantin Ilchenko",
"Konstantin Lebedev",
@ -571,6 +576,7 @@ const char * auto_contributors[] {
"Mc.Spring",
"Meena Renganathan",
"Meena-Renganathan",
"MeenaRenganathan22",
"MeiK",
"Memo",
"Metehan Çetinkaya",
@ -866,10 +872,12 @@ const char * auto_contributors[] {
"VDimir",
"VVMak",
"Vadim",
"Vadim Akhma",
"Vadim Plakhtinskiy",
"Vadim Skipin",
"Vadim Volodin",
"VadimPE",
"Vage Ogannisian",
"Val",
"Valera Ryaboshapko",
"Varinara",
@ -1033,6 +1041,7 @@ const char * auto_contributors[] {
"bobrovskij artemij",
"booknouse",
"bseng",
"candiduslynx",
"canenoneko",
"caspian",
"cekc",
@ -1266,6 +1275,7 @@ const char * auto_contributors[] {
"maxim-babenko",
"maxkuzn",
"maxulan",
"mayamika",
"mehanizm",
"melin",
"memo",
@ -1348,7 +1358,10 @@ const char * auto_contributors[] {
"ritaank",
"rnbondarenko",
"robert",
"robot-ch-test-poll1",
"robot-ch-test-poll4",
"robot-clickhouse",
"robot-clickhouse-ci-1",
"robot-metrika-test",
"rodrigargar",
"roman",
@ -1372,7 +1385,9 @@ const char * auto_contributors[] {
"shedx",
"shuchaome",
"shuyang",
"sichenzhao",
"simon-says",
"simpleton",
"snyk-bot",
"songenjie",
"sperlingxx",
@ -1380,6 +1395,7 @@ const char * auto_contributors[] {
"spongedc",
"spume",
"spyros87",
"stan",
"stavrolia",
"stepenhu",
"su-houzhen",
@ -1435,6 +1451,7 @@ const char * auto_contributors[] {
"wangdh15",
"weeds085490",
"whysage",
"wineternity",
"wuxiaobai24",
"wzl",
"xPoSx",
@ -1458,6 +1475,7 @@ const char * auto_contributors[] {
"yonesko",
"youenn lebras",
"young scott",
"yuanyimeng",
"yuchuansun",
"yuefoo",
"yulu86",

View File

@ -83,7 +83,7 @@ ColumnsDescription TableFunctionHDFSCluster::getActualTableStructure(ContextPtr
StoragePtr TableFunctionHDFSCluster::getStorage(
const String & /*source*/, const String & /*format_*/, const ColumnsDescription &, ContextPtr context,
const String & /*source*/, const String & /*format_*/, const ColumnsDescription & columns, ContextPtr context,
const std::string & table_name, const String & /*compression_method_*/) const
{
StoragePtr storage;
@ -94,7 +94,7 @@ StoragePtr TableFunctionHDFSCluster::getStorage(
filename,
StorageID(getDatabaseName(), table_name),
format,
getActualTableStructure(context),
columns,
ConstraintsDescription{},
String{},
context,
@ -107,8 +107,8 @@ StoragePtr TableFunctionHDFSCluster::getStorage(
storage = std::make_shared<StorageHDFSCluster>(
context,
cluster_name, filename, StorageID(getDatabaseName(), table_name),
format, getActualTableStructure(context), ConstraintsDescription{},
compression_method);
format, columns, ConstraintsDescription{},
compression_method, structure != "auto");
}
return storage;
}

View File

@ -28,7 +28,6 @@ public:
{
return name;
}
bool hasStaticStructure() const override { return true; }
protected:
StoragePtr getStorage(

View File

@ -232,19 +232,52 @@ def need_retry(args, stdout, stderr, total_time):
)
def get_processlist(args):
def get_processlist_with_stacktraces(args):
try:
if args.replicated_database:
return clickhouse_execute_json(
args,
"""
SELECT materialize((hostName(), tcpPort())) as host, *
FROM clusterAllReplicas('test_cluster_database_replicated', system.processes)
WHERE query NOT LIKE '%system.processes%'
SELECT materialize(hostName() || '::' || tcpPort()::String) as host_port, *
-- NOTE: view() here to do JOIN on shards, instead of initiator
FROM clusterAllReplicas('test_cluster_database_replicated', view(
SELECT
groupArray((s.thread_id, arrayStringConcat(arrayMap(
x -> concat(addressToLine(x), '::', demangle(addressToSymbol(x))),
s.trace), '\n') AS stacktrace
)) AS stacktraces,
p.*
FROM system.processes p
JOIN system.stack_trace s USING (query_id)
WHERE query NOT LIKE '%system.processes%'
GROUP BY p.*
))
ORDER BY elapsed DESC
""",
settings={
"allow_introspection_functions": 1,
},
)
else:
return clickhouse_execute_json(args, "SHOW PROCESSLIST")
return clickhouse_execute_json(
args,
"""
SELECT
groupArray((s.thread_id, arrayStringConcat(arrayMap(
x -> concat(addressToLine(x), '::', demangle(addressToSymbol(x))),
s.trace), '\n') AS stacktrace
)) AS stacktraces,
p.*
FROM system.processes p
JOIN system.stack_trace s USING (query_id)
WHERE query NOT LIKE '%system.processes%'
GROUP BY p.*
ORDER BY elapsed DESC
""",
settings={
"allow_introspection_functions": 1,
},
)
except Exception as e:
return "Failed to get processlist: " + str(e)
@ -1223,7 +1256,7 @@ class TestSuite:
line = line.strip()
if line and not is_shebang(line):
return line
return ''
return ""
def load_tags_from_file(filepath):
comment_sign = get_comment_sign(filepath)
@ -1750,7 +1783,7 @@ def removesuffix(text, *suffixes):
return text
def reportCoverageFor(args, what, query, permissive = False):
def reportCoverageFor(args, what, query, permissive=False):
value = clickhouse_execute(args, query).decode()
if value != "":
@ -1763,10 +1796,11 @@ def reportCoverageFor(args, what, query, permissive = False):
def reportCoverage(args):
return reportCoverageFor(
args,
"functions",
"""
return (
reportCoverageFor(
args,
"functions",
"""
SELECT name
FROM system.functions
WHERE NOT is_aggregate AND origin = 'System' AND alias_to = ''
@ -1776,11 +1810,12 @@ def reportCoverage(args):
)
ORDER BY name
""",
True
) and reportCoverageFor(
args,
"aggregate functions",
"""
True,
)
and reportCoverageFor(
args,
"aggregate functions",
"""
SELECT name
FROM system.functions
WHERE is_aggregate AND origin = 'System' AND alias_to = ''
@ -1789,11 +1824,12 @@ def reportCoverage(args):
SELECT arrayJoin(used_aggregate_functions) FROM system.query_log WHERE event_date >= yesterday()
)
ORDER BY name
"""
) and reportCoverageFor(
args,
"aggregate function combinators",
"""
""",
)
and reportCoverageFor(
args,
"aggregate function combinators",
"""
SELECT name
FROM system.aggregate_function_combinators
WHERE NOT is_internal
@ -1802,11 +1838,12 @@ def reportCoverage(args):
SELECT arrayJoin(used_aggregate_function_combinators) FROM system.query_log WHERE event_date >= yesterday()
)
ORDER BY name
"""
) and reportCoverageFor(
args,
"data type families",
"""
""",
)
and reportCoverageFor(
args,
"data type families",
"""
SELECT name
FROM system.data_type_families
WHERE alias_to = '' AND name NOT LIKE 'Interval%'
@ -1815,7 +1852,8 @@ def reportCoverage(args):
SELECT arrayJoin(used_data_type_families) FROM system.query_log WHERE event_date >= yesterday()
)
ORDER BY name
"""
""",
)
)
def reportLogStats(args):
@ -1924,7 +1962,9 @@ def main(args):
args, "system", "processes", "is_all_data_sent"
)
if args.s3_storage and (BuildFlags.THREAD in args.build_flags or BuildFlags.DEBUG in args.build_flags):
if args.s3_storage and (
BuildFlags.THREAD in args.build_flags or BuildFlags.DEBUG in args.build_flags
):
args.no_random_settings = True
if args.skip:
@ -1996,10 +2036,9 @@ def main(args):
exit_code.value = 1
if args.hung_check:
# Some queries may execute in background for some time after test was finished. This is normal.
for _ in range(1, 60):
processlist = get_processlist(args)
processlist = get_processlist_with_stacktraces(args)
if not processlist:
break
sleep(1)
@ -2013,7 +2052,6 @@ def main(args):
print(json.dumps(processlist, indent=4))
print(get_transactions_list(args))
print_stacktraces()
exit_code.value = 1
else:
print(colored("\nNo queries hung.", args, "green", attrs=["bold"]))

View File

@ -0,0 +1,11 @@
-- Tags: no-fasttest, no-parallel, no-cpu-aarch64
-- Tag no-fasttest: Depends on Java
insert into table function hdfs('hdfs://localhost:12222/test_02536.jsonl', 'TSV') select '{"x" : {"a" : 1, "b" : 2}}' settings hdfs_truncate_on_insert=1;
drop table if exists test;
create table test (x Tuple(a UInt32, b UInt32)) engine=Memory();
insert into test select * from hdfsCluster('test_cluster_two_shards_localhost', 'hdfs://localhost:12222/test_02536.jsonl') settings use_structure_from_insertion_table_in_table_functions=0; -- {serverError TYPE_MISMATCH}
insert into test select * from hdfsCluster('test_cluster_two_shards_localhost', 'hdfs://localhost:12222/test_02536.jsonl') settings use_structure_from_insertion_table_in_table_functions=1;
select * from test;
drop table test;