Merge branch 'ClickHouse:master' into zvonand-datetime-ranges

This commit is contained in:
Andrey Zvonov 2023-10-26 11:42:13 +02:00 committed by GitHub
commit 5737369d0b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
43 changed files with 773 additions and 181 deletions

View File

@ -210,7 +210,7 @@ detach
quit
" > script.gdb
gdb -batch -command script.gdb -p "$(cat /var/run/clickhouse-server/clickhouse-server.pid)" &
gdb -batch -command script.gdb -p $server_pid &
sleep 5
# gdb will send SIGSTOP, spend some time loading debug info and then send SIGCONT, wait for it (up to send_timeout, 300s)
time clickhouse-client --query "SELECT 'Connected to clickhouse-server after attaching gdb'" ||:
@ -219,13 +219,12 @@ quit
# to freeze and the fuzzer will fail. In debug build it can take a lot of time.
for _ in {1..180}
do
sleep 1
if clickhouse-client --query "select 1"
then
break
fi
sleep 1
done
clickhouse-client --query "select 1" # This checks that the server is responding
kill -0 $server_pid # This checks that it is our server that is started and not some other one
echo 'Server started and responded'

View File

@ -2403,6 +2403,17 @@ See also:
- [optimize_functions_to_subcolumns](#optimize-functions-to-subcolumns)
## optimize_trivial_approximate_count_query {#optimize_trivial_approximate_count_query}
Use an approximate value for trivial count optimization of storages that support such estimation, for example, EmbeddedRocksDB.
Possible values:
- 0 — Optimization disabled.
- 1 — Optimization enabled.
Default value: `0`.
## optimize_count_from_files {#optimize_count_from_files}
Enables or disables the optimization of counting number of rows from files in different input formats. It applies to table functions/engines `file`/`s3`/`url`/`hdfs`/`azureBlobStorage`.

View File

@ -5,19 +5,39 @@ sidebar_label: CHECK TABLE
title: "CHECK TABLE Statement"
---
Checks if the data in the table is corrupted.
The `CHECK TABLE` query in ClickHouse is used to perform a validation check on a specific table or its partitions. It ensures the integrity of the data by verifying the checksums and other internal data structures.
``` sql
CHECK TABLE [db.]name [PARTITION partition_expr]
Particularly it compares actual file sizes with the expected values which are stored on the server. If the file sizes do not match the stored values, it means the data is corrupted. This can be caused, for example, by a system crash during query execution.
:::note
The `CHECK TABLE`` query may read all the data in the table and hold some resources, making it resource-intensive.
Consider the potential impact on performance and resource utilization before executing this query.
:::
## Syntax
The basic syntax of the query is as follows:
```sql
CHECK TABLE table_name [PARTITION partition_expression | PART part_name] [FORMAT format] [SETTINGS check_query_single_value_result = (0|1) [, other_settings]]
```
The `CHECK TABLE` query compares actual file sizes with the expected values which are stored on the server. If the file sizes do not match the stored values, it means the data is corrupted. This can be caused, for example, by a system crash during query execution.
- `table_name`: Specifies the name of the table that you want to check.
- `partition_expression`: (Optional) If you want to check a specific partition of the table, you can use this expression to specify the partition.
- `part_name`: (Optional) If you want to check a specific part in the table, you can add string literal to specify a part name.
- `FORMAT format`: (Optional) Allows you to specify the output format of the result.
- `SETTINGS`: (Optional) Allows additional settings.
- **`check_query_single_value_result`**: (Optional) This setting allows you to toggle between a detailed result (`0`) or a summarized result (`1`).
- Other settings can be applied as well. If you don't require a deterministic order for the results, you can set max_threads to a value greater than one to speed up the query.
The query response contains the `result` column with a single row. The row has a value of
[Boolean](../../sql-reference/data-types/boolean.md) type:
- 0 - The data in the table is corrupted.
- 1 - The data maintains integrity.
The query response depends on the value of contains `check_query_single_value_result` setting.
In case of `check_query_single_value_result = 1` only `result` column with a single row is returned. Value inside this row is `1` if the integrity check is passed and `0` if data is corrupted.
With `check_query_single_value_result = 0` the query returns the following columns:
- `part_path`: Indicates the path to the data part or file name.
- `is_passed`: Returns 1 if the check for this part is successful, 0 otherwise.
- `message`: Any additional messages related to the check, such as errors or success messages.
The `CHECK TABLE` query supports the following table engines:
@ -26,30 +46,15 @@ The `CHECK TABLE` query supports the following table engines:
- [StripeLog](../../engines/table-engines/log-family/stripelog.md)
- [MergeTree family](../../engines/table-engines/mergetree-family/mergetree.md)
Performed over the tables with another table engines causes an exception.
Performed over the tables with another table engines causes an `NOT_IMPLEMETED` exception.
Engines from the `*Log` family do not provide automatic data recovery on failure. Use the `CHECK TABLE` query to track data loss in a timely manner.
## Checking the MergeTree Family Tables
## Examples
For `MergeTree` family engines, if [check_query_single_value_result](../../operations/settings/settings.md#check_query_single_value_result) = 0, the `CHECK TABLE` query shows a check status for every individual data part of a table on the local server.
By default `CHECK TABLE` query shows the general table check status:
```sql
SET check_query_single_value_result = 0;
CHECK TABLE test_table;
```
```text
┌─part_path─┬─is_passed─┬─message─┐
│ all_1_4_1 │ 1 │ │
│ all_1_4_2 │ 1 │ │
└───────────┴───────────┴─────────┘
```
If `check_query_single_value_result` = 1, the `CHECK TABLE` query shows the general table check status.
```sql
SET check_query_single_value_result = 1;
CHECK TABLE test_table;
```
@ -59,11 +64,86 @@ CHECK TABLE test_table;
└────────┘
```
If you want to see the check status for every individual data part you may use `check_query_single_value_result` setting.
Also, to check a specific partition of the table, you can use the `PARTITION` keyword.
```sql
CHECK TABLE t0 PARTITION ID '201003'
FORMAT PrettyCompactMonoBlock
SETTINGS check_query_single_value_result = 0
```
Output:
```text
┌─part_path────┬─is_passed─┬─message─┐
│ 201003_7_7_0 │ 1 │ │
│ 201003_3_3_0 │ 1 │ │
└──────────────┴───────────┴─────────┘
```
Similarly, you can check a specific part of the table by using the `PART` keyword.
```sql
CHECK TABLE t0 PART '201003_7_7_0'
FORMAT PrettyCompactMonoBlock
SETTINGS check_query_single_value_result = 0
```
Output:
```text
┌─part_path────┬─is_passed─┬─message─┐
│ 201003_7_7_0 │ 1 │ │
└──────────────┴───────────┴─────────┘
```
Note that when part does not exist, the query returns an error:
```sql
CHECK TABLE t0 PART '201003_111_222_0'
```
```text
DB::Exception: No such data part '201003_111_222_0' to check in table 'default.t0'. (NO_SUCH_DATA_PART)
```
### Receiving a 'Corrupted' Result
:::warning
Disclaimer: The procedure described here, including the manual manipulating or removing files directly from the data directory, is for experimental or development environments only. Do **not** attempt this on a production server, as it may lead to data loss or other unintended consequences.
:::
Remove the existing checksum file:
```bash
rm /var/lib/clickhouse-server/data/default/t0/201003_3_3_0/checksums.txt
```
```sql
CHECK TABLE t0 PARTITION ID '201003'
FORMAT PrettyCompactMonoBlock
SETTINGS check_query_single_value_result = 0
Output:
```text
┌─part_path────┬─is_passed─┬─message──────────────────────────────────┐
│ 201003_7_7_0 │ 1 │ │
│ 201003_3_3_0 │ 1 │ Checksums recounted and written to disk. │
└──────────────┴───────────┴──────────────────────────────────────────┘
```
If the checksums.txt file is missing, it can be restored. It will be recalculated and rewritten during the execution of the CHECK TABLE command for the specific partition, and the status will still be reported as 'success.'"
## If the Data Is Corrupted
If the table is corrupted, you can copy the non-corrupted data to another table. To do this:
1. Create a new table with the same structure as damaged table. To do this execute the query `CREATE TABLE <new_table_name> AS <damaged_table_name>`.
2. Set the [max_threads](../../operations/settings/settings.md#settings-max_threads) value to 1 to process the next query in a single thread. To do this run the query `SET max_threads = 1`.
2. Set the `max_threads` value to 1 to process the next query in a single thread. To do this run the query `SET max_threads = 1`.
3. Execute the query `INSERT INTO <new_table_name> SELECT * FROM <damaged_table_name>`. This request copies the non-corrupted data from the damaged table to another table. Only the data before the corrupted part will be copied.
4. Restart the `clickhouse-client` to reset the `max_threads` value.

View File

@ -82,33 +82,32 @@ size_t FileChecker::getTotalSize() const
}
CheckResults FileChecker::check() const
FileChecker::DataValidationTasksPtr FileChecker::getDataValidationTasks()
{
if (map.empty())
return std::make_unique<DataValidationTasks>(map);
}
std::optional<CheckResult> FileChecker::checkNextEntry(DataValidationTasksPtr & check_data_tasks) const
{
String name;
size_t expected_size;
bool is_finished = check_data_tasks->next(name, expected_size);
if (is_finished)
return {};
CheckResults results;
String path = parentPath(files_info_path) + name;
bool exists = fileReallyExists(path);
auto real_size = exists ? getRealFileSize(path) : 0; /// No race condition assuming no one else is working with these files.
for (const auto & name_size : map)
if (real_size != expected_size)
{
const String & name = name_size.first;
String path = parentPath(files_info_path) + name;
bool exists = fileReallyExists(path);
auto real_size = exists ? getRealFileSize(path) : 0; /// No race condition assuming no one else is working with these files.
if (real_size != name_size.second)
{
String failure_message = exists
? ("Size of " + path + " is wrong. Size is " + toString(real_size) + " but should be " + toString(name_size.second))
: ("File " + path + " doesn't exist");
results.emplace_back(name, false, failure_message);
break;
}
results.emplace_back(name, true, "");
String failure_message = exists
? ("Size of " + path + " is wrong. Size is " + toString(real_size) + " but should be " + toString(expected_size))
: ("File " + path + " doesn't exist");
return CheckResult(name, false, failure_message);
}
return results;
return CheckResult(name, true, "");
}
void FileChecker::repair()

View File

@ -3,6 +3,7 @@
#include <Storages/CheckResults.h>
#include <map>
#include <base/types.h>
#include <mutex>
namespace Poco { class Logger; }
@ -28,7 +29,11 @@ public:
bool empty() const { return map.empty(); }
/// Check the files whose parameters are specified in sizes.json
CheckResults check() const;
/// See comment in IStorage::checkDataNext
struct DataValidationTasks;
using DataValidationTasksPtr = std::unique_ptr<DataValidationTasks>;
DataValidationTasksPtr getDataValidationTasks();
std::optional<CheckResult> checkNextEntry(DataValidationTasksPtr & check_data_tasks) const;
/// Truncate files that have excessive size to the expected size.
/// Throw exception if the file size is less than expected.
@ -41,6 +46,36 @@ public:
/// Returns total size of all files.
size_t getTotalSize() const;
struct DataValidationTasks
{
DataValidationTasks(const std::map<String, size_t> & map_)
: map(map_), it(map.begin())
{}
bool next(String & out_name, size_t & out_size)
{
std::lock_guard lock(mutex);
if (it == map.end())
return true;
out_name = it->first;
out_size = it->second;
++it;
return false;
}
size_t size() const
{
std::lock_guard lock(mutex);
return std::distance(it, map.end());
}
const std::map<String, size_t> & map;
mutable std::mutex mutex;
using Iterator = std::map<String, size_t>::const_iterator;
Iterator it;
};
private:
void load();

View File

@ -542,6 +542,7 @@ class IColumn;
M(Bool, database_atomic_wait_for_drop_and_detach_synchronously, false, "When executing DROP or DETACH TABLE in Atomic database, wait for table data to be finally dropped or detached.", 0) \
M(Bool, enable_scalar_subquery_optimization, true, "If it is set to true, prevent scalar subqueries from (de)serializing large scalar values and possibly avoid running the same subquery more than once.", 0) \
M(Bool, optimize_trivial_count_query, true, "Process trivial 'SELECT count() FROM table' query from metadata.", 0) \
M(Bool, optimize_trivial_approximate_count_query, false, "Use an approximate value for trivial count optimization of storages that support such estimations.", 0) \
M(Bool, optimize_count_from_files, true, "Optimize counting rows from files in supported input formats", 0) \
M(Bool, use_cache_for_count_from_files, true, "Use cache to count the number of rows in files", 0) \
M(Bool, optimize_respect_aliases, true, "If it is set to true, it will respect aliases in WHERE/GROUP BY/ORDER BY, that will help with partition pruning/secondary indexes/optimize_aggregation_in_order/optimize_read_in_order/optimize_trivial_count", 0) \

View File

@ -3,77 +3,205 @@
#include <Access/Common/AccessFlags.h>
#include <Storages/IStorage.h>
#include <Parsers/ASTCheckQuery.h>
#include <Parsers/ASTSetQuery.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeString.h>
#include <Columns/ColumnsNumber.h>
#include <Common/typeid_cast.h>
#include <Interpreters/ProcessList.h>
#include <algorithm>
#include <Columns/IColumn.h>
#include <Processors/ResizeProcessor.h>
#include <Processors/IAccumulatingTransform.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
namespace
{
NamesAndTypes getBlockStructure()
Block getSingleValueBlock(UInt8 value)
{
return {
return Block{{ColumnUInt8::create(1, value), std::make_shared<DataTypeUInt8>(), "result"}};
}
Block getHeaderForCheckResult()
{
auto names_and_types = NamesAndTypes{
{"part_path", std::make_shared<DataTypeString>()},
{"is_passed", std::make_shared<DataTypeUInt8>()},
{"message", std::make_shared<DataTypeString>()},
};
return Block({
{names_and_types[0].type->createColumn(), names_and_types[0].type, names_and_types[0].name},
{names_and_types[1].type->createColumn(), names_and_types[1].type, names_and_types[1].name},
{names_and_types[2].type->createColumn(), names_and_types[2].type, names_and_types[2].name},
});
}
Chunk getChunkFromCheckResult(const CheckResult & check_result)
{
MutableColumns columns = getHeaderForCheckResult().cloneEmptyColumns();
columns[0]->insert(check_result.fs_path);
columns[1]->insert(static_cast<UInt8>(check_result.success));
columns[2]->insert(check_result.failure_message);
return Chunk(std::move(columns), 1);
}
class TableCheckWorkerProcessor : public ISource
{
public:
TableCheckWorkerProcessor(IStorage::DataValidationTasksPtr check_data_tasks_, StoragePtr table_)
: ISource(getHeaderForCheckResult())
, table(table_)
, check_data_tasks(check_data_tasks_)
{
}
String getName() const override { return "TableCheckWorkerProcessor"; }
protected:
std::optional<Chunk> tryGenerate() override
{
auto check_result = table->checkDataNext(check_data_tasks);
if (!check_result)
return {};
/// We can omit manual `progess` call, ISource will may count it automatically by returned chunk
/// However, we want to report only rows in progress, since bytes doesn't make sense here
progress(1, 0);
if (!check_result->success)
{
LOG_WARNING(&Poco::Logger::get("InterpreterCheckQuery"),
"Check query for table {} failed, path {}, reason: {}",
table->getStorageID().getNameForLogs(),
check_result->fs_path,
check_result->failure_message);
}
return getChunkFromCheckResult(*check_result);
}
private:
StoragePtr table;
IStorage::DataValidationTasksPtr check_data_tasks;
};
class TableCheckResultEmitter : public IAccumulatingTransform
{
public:
TableCheckResultEmitter() : IAccumulatingTransform(getHeaderForCheckResult(), getSingleValueBlock(1).cloneEmpty()) {}
String getName() const override { return "TableCheckResultEmitter"; }
void consume(Chunk chunk) override
{
if (result_value == 0)
return;
auto columns = chunk.getColumns();
if (columns.size() != 3)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Wrong number of columns: {}", columns.size());
const auto * col = checkAndGetColumn<ColumnUInt8>(columns[1].get());
for (size_t i = 0; i < col->size(); ++i)
{
if (col->getElement(i) == 0)
{
result_value = 0;
return;
}
}
}
Chunk generate() override
{
if (is_value_emitted.exchange(true))
return {};
auto block = getSingleValueBlock(result_value);
return Chunk(block.getColumns(), block.rows());
}
private:
std::atomic<UInt8> result_value{1};
std::atomic_bool is_value_emitted{false};
};
}
InterpreterCheckQuery::InterpreterCheckQuery(const ASTPtr & query_ptr_, ContextPtr context_) : WithContext(context_), query_ptr(query_ptr_)
InterpreterCheckQuery::InterpreterCheckQuery(const ASTPtr & query_ptr_, ContextPtr context_)
: WithContext(context_)
, query_ptr(query_ptr_)
{
}
BlockIO InterpreterCheckQuery::execute()
{
const auto & check = query_ptr->as<ASTCheckQuery &>();
auto table_id = getContext()->resolveStorageID(check, Context::ResolveOrdinary);
const auto & context = getContext();
auto table_id = context->resolveStorageID(check, Context::ResolveOrdinary);
getContext()->checkAccess(AccessType::SHOW_TABLES, table_id);
StoragePtr table = DatabaseCatalog::instance().getTable(table_id, getContext());
auto check_results = table->checkData(query_ptr, getContext());
context->checkAccess(AccessType::SHOW_TABLES, table_id);
StoragePtr table = DatabaseCatalog::instance().getTable(table_id, context);
Block block;
if (getContext()->getSettingsRef().check_query_single_value_result)
{
bool result = std::all_of(check_results.begin(), check_results.end(), [] (const CheckResult & res) { return res.success; });
auto column = ColumnUInt8::create();
column->insertValue(static_cast<UInt64>(result));
block = Block{{std::move(column), std::make_shared<DataTypeUInt8>(), "result"}};
}
else
{
auto block_structure = getBlockStructure();
auto path_column = block_structure[0].type->createColumn();
auto is_passed_column = block_structure[1].type->createColumn();
auto message_column = block_structure[2].type->createColumn();
auto check_data_tasks = table->getCheckTaskList(query_ptr, context);
for (const auto & check_result : check_results)
{
path_column->insert(check_result.fs_path);
is_passed_column->insert(static_cast<UInt8>(check_result.success));
message_column->insert(check_result.failure_message);
}
block = Block({
{std::move(path_column), block_structure[0].type, block_structure[0].name},
{std::move(is_passed_column), block_structure[1].type, block_structure[1].name},
{std::move(message_column), block_structure[2].type, block_structure[2].name}});
}
const auto & settings = context->getSettingsRef();
BlockIO res;
res.pipeline = QueryPipeline(std::make_shared<SourceFromSingleChunk>(std::move(block)));
{
auto processors = std::make_shared<Processors>();
std::vector<OutputPort *> worker_ports;
size_t num_streams = std::max<size_t>(settings.max_threads, 1);
for (size_t i = 0; i < num_streams; ++i)
{
auto worker_processor = std::make_shared<TableCheckWorkerProcessor>(check_data_tasks, table);
if (i == 0)
worker_processor->addTotalRowsApprox(check_data_tasks->size());
worker_ports.emplace_back(&worker_processor->getPort());
processors->emplace_back(worker_processor);
}
OutputPort * resize_outport;
{
auto resize_processor = std::make_shared<ResizeProcessor>(getHeaderForCheckResult(), worker_ports.size(), 1);
auto & resize_inputs = resize_processor->getInputs();
auto resize_inport_it = resize_inputs.begin();
for (size_t i = 0; i < worker_ports.size(); ++i, ++resize_inport_it)
connect(*worker_ports[i], *resize_inport_it);
processors->emplace_back(resize_processor);
assert(resize_processor->getOutputs().size() == 1);
resize_outport = &resize_processor->getOutputs().front();
}
if (settings.check_query_single_value_result)
{
auto emitter_processor = std::make_shared<TableCheckResultEmitter>();
auto * input_port = &emitter_processor->getInputPort();
processors->emplace_back(emitter_processor);
connect(*resize_outport, *input_port);
}
res.pipeline = QueryPipeline(Pipe(std::move(processors)));
res.pipeline.setNumThreads(num_streams);
}
return res;
}

View File

@ -10,6 +10,7 @@ namespace DB
struct ASTCheckQuery : public ASTQueryWithTableAndOutput
{
ASTPtr partition;
String part_name;
/** Get the text that identifies this element. */
String getID(char delim) const override { return "CheckQuery" + (delim + getDatabase()) + delim + getTable(); }

View File

@ -2,6 +2,7 @@
#include <Parsers/CommonParsers.h>
#include <Parsers/ExpressionElementParsers.h>
#include <Parsers/ASTCheckQuery.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ParserPartition.h>
#include <Parsers/parseDatabaseAndTableName.h>
@ -13,9 +14,11 @@ bool ParserCheckQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
ParserKeyword s_check_table("CHECK TABLE");
ParserKeyword s_partition("PARTITION");
ParserKeyword s_part("PART");
ParserToken s_dot(TokenType::Dot);
ParserPartition partition_parser;
ParserStringLiteral parser_string_literal;
if (!s_check_table.ignore(pos, expected))
return false;
@ -30,6 +33,17 @@ bool ParserCheckQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
if (!partition_parser.parse(pos, query->partition, expected))
return false;
}
else if (s_part.ignore(pos, expected))
{
ASTPtr ast_part_name;
if (!parser_string_literal.parse(pos, ast_part_name, expected))
return false;
const auto * ast_literal = ast_part_name->as<ASTLiteral>();
if (!ast_literal || ast_literal->value.getType() != Field::Types::String)
return false;
query->part_name = ast_literal->value.get<const String &>();
}
if (query->database)
query->children.push_back(query->database);

View File

@ -22,6 +22,4 @@ struct CheckResult
{}
};
using CheckResults = std::vector<CheckResult>;
}

View File

@ -276,6 +276,16 @@ bool IStorage::isStaticStorage() const
return false;
}
IStorage::DataValidationTasksPtr IStorage::getCheckTaskList(const ASTPtr & /* query */, ContextPtr /* context */)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Check query is not supported for {} storage", getName());
}
std::optional<CheckResult> IStorage::checkDataNext(DataValidationTasksPtr & /* check_task_list */)
{
return {};
}
void IStorage::adjustCreateQueryForBackup(ASTPtr &) const
{
}

View File

@ -600,8 +600,44 @@ public:
/// Provides a hint that the storage engine may evaluate the IN-condition by using an index.
virtual bool mayBenefitFromIndexForIn(const ASTPtr & /* left_in_operand */, ContextPtr /* query_context */, const StorageMetadataPtr & /* metadata_snapshot */) const { return false; }
/// Checks validity of the data
virtual CheckResults checkData(const ASTPtr & /* query */, ContextPtr /* context */) { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Check query is not supported for {} storage", getName()); }
/** A list of tasks to check a validity of data.
* Each IStorage implementation may interpret this task in its own way.
* E.g. for some storages it to check data it need to check a list of files in filesystem, for others it can be a list of parts.
* Also it may hold resources (e.g. locks) required during check.
*/
struct DataValidationTasksBase
{
/// Number of entries left to check.
/// It decreases after each call to checkDataNext().
virtual size_t size() const = 0;
virtual ~DataValidationTasksBase() = default;
};
using DataValidationTasksPtr = std::shared_ptr<DataValidationTasksBase>;
virtual DataValidationTasksPtr getCheckTaskList(const ASTPtr & /* query */, ContextPtr /* context */);
/** Executes one task from the list.
* If no tasks left - returns nullopt.
* Note: Function `checkDataNext` is accessing `check_task_list` thread-safely,
* and can be called simultaneously for the same `getCheckTaskList` result
* to process different tasks in parallel.
* Usage:
*
* auto check_task_list = storage.getCheckTaskList(query, context);
* size_t total_tasks = check_task_list->size();
* while (true)
* {
* size_t tasks_left = check_task_list->size();
* std::cout << "Checking data: " << (total_tasks - tasks_left) << " / " << total_tasks << " tasks done." << std::endl;
* auto result = storage.checkDataNext(check_task_list);
* if (!result)
* break;
* doSomething(*result);
* }
*/
virtual std::optional<CheckResult> checkDataNext(DataValidationTasksPtr & check_task_list);
/// Checks that table could be dropped right now
/// Otherwise - throws an exception with detailed information.

View File

@ -609,5 +609,19 @@ void registerStorageEmbeddedRocksDB(StorageFactory & factory)
factory.registerStorage("EmbeddedRocksDB", create, features);
}
std::optional<UInt64> StorageEmbeddedRocksDB::totalRows(const Settings & settings) const
{
if (settings.optimize_trivial_approximate_count_query)
{
std::shared_lock lock(rocksdb_ptr_mx);
if (!rocksdb_ptr)
return {};
UInt64 estimated_rows;
if (!rocksdb_ptr->GetIntProperty("rocksdb.estimate-num-keys", &estimated_rows))
return {};
return estimated_rows;
}
return {};
}
}

View File

@ -83,6 +83,10 @@ public:
bool supportsDelete() const override { return true; }
bool supportsTrivialCountOptimization() const override { return true; }
std::optional<UInt64> totalRows(const Settings & settings) const override;
private:
const String primary_key;
using RocksDBPtr = std::unique_ptr<rocksdb::DB>;

View File

@ -85,6 +85,8 @@ public:
const Names & getKeyNames() const { return key_names; }
bool supportsTrivialCountOptimization() const override { return true; }
private:
Block sample_block;
const Names key_names;

View File

@ -7,6 +7,8 @@
#include <Interpreters/evaluateConstantExpression.h>
#include <Parsers/ASTCheckQuery.h>
#include <IO/LimitReadBuffer.h>
#include <IO/ReadBufferFromFileBase.h>
#include <IO/ReadHelpers.h>
@ -57,6 +59,7 @@ namespace ErrorCodes
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int INCORRECT_FILE_NAME;
extern const int CANNOT_RESTORE_TABLE;
extern const int NOT_IMPLEMENTED;
}
/// NOTE: The lock `StorageLog::rwlock` is NOT kept locked while reading,
@ -874,15 +877,23 @@ SinkToStoragePtr StorageLog::write(const ASTPtr & /*query*/, const StorageMetada
return std::make_shared<LogSink>(*this, metadata_snapshot, std::move(lock));
}
CheckResults StorageLog::checkData(const ASTPtr & /* query */, ContextPtr local_context)
IStorage::DataValidationTasksPtr StorageLog::getCheckTaskList(const ASTPtr & query, ContextPtr local_context)
{
const auto * check_query = query->as<ASTCheckQuery>();
if (check_query->partition || !check_query->part_name.empty())
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "CHECK PART/PARTITION are not supported for {}", getName());
ReadLock lock{rwlock, getLockTimeout(local_context)};
if (!lock)
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Lock timeout exceeded");
return file_checker.check();
return std::make_unique<DataValidationTasks>(file_checker.getDataValidationTasks(), std::move(lock));
}
std::optional<CheckResult> StorageLog::checkDataNext(DataValidationTasksPtr & check_task_list)
{
return file_checker.checkNextEntry(assert_cast<DataValidationTasks *>(check_task_list.get())->file_checker_tasks);
}
IStorage::ColumnSizeByName StorageLog::getColumnSizes() const
{

View File

@ -59,7 +59,8 @@ public:
void rename(const String & new_path_to_table_data, const StorageID & new_table_id) override;
CheckResults checkData(const ASTPtr & query, ContextPtr local_context) override;
DataValidationTasksPtr getCheckTaskList(const ASTPtr & query, ContextPtr context) override;
std::optional<CheckResult> checkDataNext(DataValidationTasksPtr & check_task_list) override;
void truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr, TableExclusiveLockHolder &) override;
@ -142,6 +143,19 @@ private:
std::atomic<UInt64> total_rows = 0;
std::atomic<UInt64> total_bytes = 0;
struct DataValidationTasks : public IStorage::DataValidationTasksBase
{
DataValidationTasks(FileChecker::DataValidationTasksPtr file_checker_tasks_, ReadLock && lock_)
: file_checker_tasks(std::move(file_checker_tasks_)), lock(std::move(lock_))
{}
size_t size() const override { return file_checker_tasks->size(); }
FileChecker::DataValidationTasksPtr file_checker_tasks;
/// Lock to prevent table modification while checking
ReadLock lock;
};
FileChecker file_checker;
const size_t max_compress_block_size;

View File

@ -2207,19 +2207,33 @@ void StorageMergeTree::onActionLockRemove(StorageActionBlockType action_type)
background_moves_assignee.trigger();
}
CheckResults StorageMergeTree::checkData(const ASTPtr & query, ContextPtr local_context)
IStorage::DataValidationTasksPtr StorageMergeTree::getCheckTaskList(const ASTPtr & query, ContextPtr local_context)
{
CheckResults results;
DataPartsVector data_parts;
if (const auto & check_query = query->as<ASTCheckQuery &>(); check_query.partition)
{
String partition_id = getPartitionIDFromQuery(check_query.partition, local_context);
data_parts = getVisibleDataPartsVectorInPartition(local_context, partition_id);
}
else if (!check_query.part_name.empty())
{
auto part = getPartIfExists(check_query.part_name, {MergeTreeDataPartState::Active, MergeTreeDataPartState::Outdated});
if (!part)
throw Exception(ErrorCodes::NO_SUCH_DATA_PART, "No such data part '{}' to check in table '{}'",
check_query.part_name, getStorageID().getFullTableName());
data_parts.emplace_back(std::move(part));
}
else
data_parts = getVisibleDataPartsVector(local_context);
for (auto & part : data_parts)
return std::make_unique<DataValidationTasks>(std::move(data_parts), local_context);
}
std::optional<CheckResult> StorageMergeTree::checkDataNext(DataValidationTasksPtr & check_task_list)
{
auto * data_validation_tasks = assert_cast<DataValidationTasks *>(check_task_list.get());
auto local_context = data_validation_tasks->context;
if (auto part = data_validation_tasks->next())
{
/// If the checksums file is not present, calculate the checksums and write them to disk.
static constexpr auto checksums_path = "checksums.txt";
@ -2233,7 +2247,7 @@ CheckResults StorageMergeTree::checkData(const ASTPtr & query, ContextPtr local_
auto & part_mutable = const_cast<IMergeTreeDataPart &>(*part);
part_mutable.writeChecksums(part->checksums, local_context->getWriteSettings());
results.emplace_back(part->name, true, "Checksums recounted and written to disk.");
return CheckResult(part->name, true, "Checksums recounted and written to disk.");
}
catch (...)
{
@ -2241,7 +2255,7 @@ CheckResults StorageMergeTree::checkData(const ASTPtr & query, ContextPtr local_
throw;
tryLogCurrentException(log, __PRETTY_FUNCTION__);
results.emplace_back(part->name, false, "Check of part finished with error: '" + getCurrentExceptionMessage(false) + "'");
return CheckResult(part->name, false, "Check of part finished with error: '" + getCurrentExceptionMessage(false) + "'");
}
}
else
@ -2249,18 +2263,19 @@ CheckResults StorageMergeTree::checkData(const ASTPtr & query, ContextPtr local_
try
{
checkDataPart(part, true);
results.emplace_back(part->name, true, "");
return CheckResult(part->name, true, "");
}
catch (...)
{
if (isRetryableException(std::current_exception()))
throw;
results.emplace_back(part->name, false, getCurrentExceptionMessage(false));
return CheckResult(part->name, false, getCurrentExceptionMessage(false));
}
}
}
return results;
return {};
}

View File

@ -108,7 +108,8 @@ public:
void onActionLockRemove(StorageActionBlockType action_type) override;
CheckResults checkData(const ASTPtr & query, ContextPtr context) override;
DataValidationTasksPtr getCheckTaskList(const ASTPtr & query, ContextPtr context) override;
std::optional<CheckResult> checkDataNext(DataValidationTasksPtr & check_task_list) override;
bool scheduleDataProcessingJob(BackgroundJobsAssignee & assignee) override;
@ -278,6 +279,32 @@ private:
friend class MergePlainMergeTreeTask;
friend class MutatePlainMergeTreeTask;
struct DataValidationTasks : public IStorage::DataValidationTasksBase
{
DataValidationTasks(DataPartsVector && parts_, ContextPtr context_)
: parts(std::move(parts_)), it(parts.begin()), context(std::move(context_))
{}
DataPartPtr next()
{
std::lock_guard lock(mutex);
if (it == parts.end())
return nullptr;
return *(it++);
}
size_t size() const override
{
std::lock_guard lock(mutex);
return std::distance(it, parts.end());
}
mutable std::mutex mutex;
DataPartsVector parts;
DataPartsVector::const_iterator it;
ContextPtr context;
};
protected:
std::map<int64_t, MutationCommands> getAlterMutationCommandsForPart(const DataPartPtr & part) const override;

View File

@ -149,8 +149,11 @@ public:
return getNested()->mayBenefitFromIndexForIn(left_in_operand, query_context, metadata_snapshot);
}
CheckResults checkData(const ASTPtr & query, ContextPtr context) override { return getNested()->checkData(query, context); }
DataValidationTasksPtr getCheckTaskList(const ASTPtr & query, ContextPtr context) override { return getNested()->getCheckTaskList(query, context); }
std::optional<CheckResult> checkDataNext(DataValidationTasksPtr & check_task_list) override { return getNested()->checkDataNext(check_task_list); }
void checkTableCanBeDropped([[ maybe_unused ]] ContextPtr query_context) const override { getNested()->checkTableCanBeDropped(query_context); }
bool storesDataOnDisk() const override { return getNested()->storesDataOnDisk(); }
Strings getDataPaths() const override { return getNested()->getDataPaths(); }
StoragePolicyPtr getStoragePolicy() const override { return getNested()->getStoragePolicy(); }

View File

@ -8598,36 +8598,46 @@ void StorageReplicatedMergeTree::enqueuePartForCheck(const String & part_name, t
part_check_thread.enqueuePart(part_name, delay_to_check_seconds);
}
CheckResults StorageReplicatedMergeTree::checkData(const ASTPtr & query, ContextPtr local_context)
IStorage::DataValidationTasksPtr StorageReplicatedMergeTree::getCheckTaskList(const ASTPtr & query, ContextPtr local_context)
{
CheckResults results;
DataPartsVector data_parts;
if (const auto & check_query = query->as<ASTCheckQuery &>(); check_query.partition)
{
String partition_id = getPartitionIDFromQuery(check_query.partition, local_context);
data_parts = getVisibleDataPartsVectorInPartition(local_context, partition_id);
}
else if (!check_query.part_name.empty())
{
auto part = getPartIfExists(check_query.part_name, {MergeTreeDataPartState::Active, MergeTreeDataPartState::Outdated});
if (!part)
throw Exception(ErrorCodes::NO_SUCH_DATA_PART, "No such data part '{}' to check in table '{}'",
check_query.part_name, getStorageID().getFullTableName());
data_parts.emplace_back(std::move(part));
}
else
data_parts = getVisibleDataPartsVector(local_context);
{
auto part_check_lock = part_check_thread.pausePartsCheck();
auto part_check_lock = part_check_thread.pausePartsCheck();
return std::make_unique<DataValidationTasks>(std::move(data_parts), std::move(part_check_lock));
}
for (auto & part : data_parts)
std::optional<CheckResult> StorageReplicatedMergeTree::checkDataNext(DataValidationTasksPtr & check_task_list)
{
if (auto part = assert_cast<DataValidationTasks *>(check_task_list.get())->next())
{
try
{
try
{
results.push_back(part_check_thread.checkPartAndFix(part->name));
}
catch (const Exception & ex)
{
tryLogCurrentException(log, __PRETTY_FUNCTION__);
results.emplace_back(part->name, false, "Check of part finished with error: '" + ex.message() + "'");
}
return CheckResult(part_check_thread.checkPartAndFix(part->name));
}
catch (const Exception & ex)
{
tryLogCurrentException(log, __PRETTY_FUNCTION__);
return CheckResult(part->name, false, "Check of part finished with error: '" + ex.message() + "'");
}
}
return results;
return {};
}

View File

@ -230,7 +230,8 @@ public:
/// Add a part to the queue of parts whose data you want to check in the background thread.
void enqueuePartForCheck(const String & part_name, time_t delay_to_check_seconds = 0);
CheckResults checkData(const ASTPtr & query, ContextPtr context) override;
DataValidationTasksPtr getCheckTaskList(const ASTPtr & query, ContextPtr context) override;
std::optional<CheckResult> checkDataNext(DataValidationTasksPtr & check_task_list) override;
/// Checks ability to use granularity
bool canUseAdaptiveGranularity() const override;
@ -995,6 +996,34 @@ private:
bool waitZeroCopyLockToDisappear(const ZeroCopyLock & lock, size_t milliseconds_to_wait) override;
void startupImpl(bool from_attach_thread);
struct DataValidationTasks : public IStorage::DataValidationTasksBase
{
explicit DataValidationTasks(DataPartsVector && parts_, std::unique_lock<std::mutex> && parts_check_lock_)
: parts_check_lock(std::move(parts_check_lock_)), parts(std::move(parts_)), it(parts.begin())
{}
DataPartPtr next()
{
std::lock_guard lock(mutex);
if (it == parts.end())
return nullptr;
return *(it++);
}
size_t size() const override
{
std::lock_guard lock(mutex);
return std::distance(it, parts.end());
}
std::unique_lock<std::mutex> parts_check_lock;
mutable std::mutex mutex;
DataPartsVector parts;
DataPartsVector::const_iterator it;
};
};
String getPartNamePossiblyFake(MergeTreeDataFormatVersion format_version, const MergeTreePartInfo & part_info);

View File

@ -403,16 +403,18 @@ SinkToStoragePtr StorageStripeLog::write(const ASTPtr & /*query*/, const Storage
return std::make_shared<StripeLogSink>(*this, metadata_snapshot, std::move(lock));
}
CheckResults StorageStripeLog::checkData(const ASTPtr & /* query */, ContextPtr local_context)
IStorage::DataValidationTasksPtr StorageStripeLog::getCheckTaskList(const ASTPtr & /* query */, ContextPtr local_context)
{
ReadLock lock{rwlock, getLockTimeout(local_context)};
if (!lock)
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Lock timeout exceeded");
return file_checker.check();
return std::make_unique<DataValidationTasks>(file_checker.getDataValidationTasks(), std::move(lock));
}
std::optional<CheckResult> StorageStripeLog::checkDataNext(DataValidationTasksPtr & check_task_list)
{
return file_checker.checkNextEntry(assert_cast<DataValidationTasks *>(check_task_list.get())->file_checker_tasks);
}
void StorageStripeLog::truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr, TableExclusiveLockHolder &)
{

View File

@ -53,7 +53,8 @@ public:
void rename(const String & new_path_to_table_data, const StorageID & new_table_id) override;
CheckResults checkData(const ASTPtr & query, ContextPtr ocal_context) override;
DataValidationTasksPtr getCheckTaskList(const ASTPtr & query, ContextPtr context) override;
std::optional<CheckResult> checkDataNext(DataValidationTasksPtr & check_task_list) override;
bool storesDataOnDisk() const override { return true; }
Strings getDataPaths() const override { return {DB::fullPath(disk, table_path)}; }
@ -93,6 +94,20 @@ private:
const DiskPtr disk;
String table_path;
struct DataValidationTasks : public IStorage::DataValidationTasksBase
{
DataValidationTasks(FileChecker::DataValidationTasksPtr file_checker_tasks_, ReadLock && lock_)
: file_checker_tasks(std::move(file_checker_tasks_)), lock(std::move(lock_))
{}
size_t size() const override { return file_checker_tasks->size(); }
FileChecker::DataValidationTasksPtr file_checker_tasks;
/// Lock to prevent table modification while checking
ReadLock lock;
};
String data_file_path;
String index_file_path;
FileChecker file_checker;

View File

@ -65,6 +65,9 @@ def get_options(i: int, upgrade_check: bool) -> str:
f"partial_result_update_duration_ms={random.randint(10, 1000)}"
)
if random.random() < 0.1:
client_options.append("optimize_trivial_approximate_count_query=1")
if client_options:
options.append(" --client-option " + " ".join(client_options))
@ -131,20 +134,17 @@ def prepare_for_hung_check(drop_databases: bool) -> bool:
# We attach gdb to clickhouse-server before running tests
# to print stacktraces of all crashes even if clickhouse cannot print it for some reason.
# However, it obstruct checking for hung queries.
# However, it obstructs checking for hung queries.
logging.info("Will terminate gdb (if any)")
call_with_retry("kill -TERM $(pidof gdb)")
call_with_retry("tail --pid=$(pidof gdb) -f /dev/null")
# Sometimes there is a message `Child process was stopped by signal 19` in logs after stopping gdb
call_with_retry(
"kill -CONT $(cat /var/run/clickhouse-server/clickhouse-server.pid)"
"kill -CONT $(cat /var/run/clickhouse-server/clickhouse-server.pid) && clickhouse client -q 'SELECT 1 FORMAT Null'"
)
# ThreadFuzzer significantly slows down server and causes false-positive hung check failures
call_with_retry("clickhouse client -q 'SYSTEM STOP THREAD FUZZER'")
call_with_retry(make_query_command("SELECT 1 FORMAT Null"))
call_with_retry(make_query_command("SYSTEM STOP THREAD FUZZER"))
# Some tests execute SYSTEM STOP MERGES or similar queries.
# It may cause some ALTERs to hang.
# Possibly we should fix tests and forbid to use such queries without specifying table.

View File

@ -0,0 +1,52 @@
<clickhouse>
<storage_configuration>
<disks>
<default>
<keep_free_space_bytes>0</keep_free_space_bytes>
</default>
<s3>
<type>s3</type>
<endpoint>https://vdimir-test2.s3.amazonaws.com/ttt/</endpoint>
<access_key_id>AKIAZURMN3FVQCQT6Y5U</access_key_id>
<secret_access_key>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</secret_access_key>
<region>eu-central-1</region>
<metadata_path>/var/lib/clickhouse/gcs/</metadata_path>
<support_batch_delete>false</support_batch_delete>
</s3>
<s3_cache>
<type>cache</type>
<disk>s3</disk>
<path>/var/lib/clickhouse/s3_cache/</path>
<max_size>10Gi</max_size>
</s3_cache>
</disks>
<policies>
<gcs_main>
<volumes>
<default>
<disk>default</disk>
<max_data_part_size_bytes>10000000</max_data_part_size_bytes>
</default>
<main>
<disk>s3_cache</disk>
</main>
</volumes>
<move_factor>0.99</move_factor>
</gcs_main>
<two_disks>
<volumes>
<default>
<disk>default</disk>
</default>
<external>
<disk>s3</disk>
</external>
</volumes>
</two_disks>
</policies>
</storage_configuration>
<allow_remove_stale_moving_parts>true</allow_remove_stale_moving_parts>
</clickhouse>

View File

@ -24,7 +24,7 @@ def test_merge_and_part_corruption(started_cluster):
node1.query(
"""
CREATE TABLE replicated_mt(date Date, id UInt32, value Int32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/replicated_mt', '{replica}') ORDER BY id
ENGINE = ReplicatedMergeTree('/clickhouse/tables/replicated_mt', '{replica}') ORDER BY id
SETTINGS cleanup_delay_period=1, cleanup_delay_period_random_add=1, cleanup_thread_preferred_points_per_iteration=0;
""".format(
replica=node1.name
@ -59,7 +59,8 @@ def test_merge_and_part_corruption(started_cluster):
# corrupt part after merge already assigned, but not started
res_opt = p.apply_async(optimize_with_delay, (1,))
node1.query(
"CHECK TABLE replicated_mt", settings={"check_query_single_value_result": 0}
"CHECK TABLE replicated_mt",
settings={"check_query_single_value_result": 0, "max_threads": 1},
)
# start merge
node1.query("SYSTEM START REPLICATION QUEUES replicated_mt")

View File

@ -1,6 +1,7 @@
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.client import QueryRuntimeException
cluster = ClickHouseCluster(__file__)
@ -78,7 +79,7 @@ def test_check_normal_table_corruption(started_cluster):
assert (
node1.query(
"CHECK TABLE non_replicated_mt PARTITION 201902",
settings={"check_query_single_value_result": 0},
settings={"check_query_single_value_result": 0, "max_threads": 1},
)
== "201902_1_1_0\t1\t\n"
)
@ -88,7 +89,7 @@ def test_check_normal_table_corruption(started_cluster):
assert (
node1.query(
"CHECK TABLE non_replicated_mt",
settings={"check_query_single_value_result": 0},
settings={"check_query_single_value_result": 0, "max_threads": 1},
).strip()
== "201902_1_1_0\t1\tChecksums recounted and written to disk."
)
@ -100,7 +101,7 @@ def test_check_normal_table_corruption(started_cluster):
assert (
node1.query(
"CHECK TABLE non_replicated_mt PARTITION 201902",
settings={"check_query_single_value_result": 0},
settings={"check_query_single_value_result": 0, "max_threads": 1},
).strip()
== "201902_1_1_0\t1\tChecksums recounted and written to disk."
)
@ -111,12 +112,12 @@ def test_check_normal_table_corruption(started_cluster):
assert node1.query(
"CHECK TABLE non_replicated_mt",
settings={"check_query_single_value_result": 0},
settings={"check_query_single_value_result": 0, "max_threads": 1},
).strip().split("\t")[0:2] == ["201902_1_1_0", "0"]
assert node1.query(
"CHECK TABLE non_replicated_mt",
settings={"check_query_single_value_result": 0},
settings={"check_query_single_value_result": 0, "max_threads": 1},
).strip().split("\t")[0:2] == ["201902_1_1_0", "0"]
node1.query(
@ -126,7 +127,7 @@ def test_check_normal_table_corruption(started_cluster):
assert (
node1.query(
"CHECK TABLE non_replicated_mt PARTITION 201901",
settings={"check_query_single_value_result": 0},
settings={"check_query_single_value_result": 0, "max_threads": 1},
)
== "201901_2_2_0\t1\t\n"
)
@ -137,7 +138,7 @@ def test_check_normal_table_corruption(started_cluster):
assert node1.query(
"CHECK TABLE non_replicated_mt PARTITION 201901",
settings={"check_query_single_value_result": 0},
settings={"check_query_single_value_result": 0, "max_threads": 1},
).strip().split("\t")[0:2] == ["201901_2_2_0", "0"]
@ -164,13 +165,15 @@ def test_check_replicated_table_simple(started_cluster):
assert (
node1.query(
"CHECK TABLE replicated_mt", settings={"check_query_single_value_result": 0}
"CHECK TABLE replicated_mt",
settings={"check_query_single_value_result": 0, "max_threads": 1},
)
== "201902_0_0_0\t1\t\n"
)
assert (
node2.query(
"CHECK TABLE replicated_mt", settings={"check_query_single_value_result": 0}
"CHECK TABLE replicated_mt",
settings={"check_query_single_value_result": 0, "max_threads": 1},
)
== "201902_0_0_0\t1\t\n"
)
@ -185,18 +188,40 @@ def test_check_replicated_table_simple(started_cluster):
assert (
node1.query(
"CHECK TABLE replicated_mt PARTITION 201901",
settings={"check_query_single_value_result": 0},
settings={"check_query_single_value_result": 0, "max_threads": 1},
)
== "201901_0_0_0\t1\t\n"
)
assert (
node2.query(
"CHECK TABLE replicated_mt PARTITION 201901",
settings={"check_query_single_value_result": 0},
settings={"check_query_single_value_result": 0, "max_threads": 1},
)
== "201901_0_0_0\t1\t\n"
)
assert sorted(
node2.query(
"CHECK TABLE replicated_mt",
settings={"check_query_single_value_result": 0},
).split("\n")
) == ["", "201901_0_0_0\t1\t", "201902_0_0_0\t1\t"]
with pytest.raises(QueryRuntimeException) as exc:
node2.query(
"CHECK TABLE replicated_mt PART '201801_0_0_0'",
settings={"check_query_single_value_result": 0},
)
assert "NO_SUCH_DATA_PART" in str(exc.value)
assert (
node2.query(
"CHECK TABLE replicated_mt PART '201902_0_0_0'",
settings={"check_query_single_value_result": 0},
)
== "201902_0_0_0\t1\t\n"
)
def test_check_replicated_table_corruption(started_cluster):
for node in [node1, node2]:
@ -229,7 +254,7 @@ def test_check_replicated_table_corruption(started_cluster):
corrupt_data_part_on_disk(node1, "replicated_mt_1", part_name)
assert node1.query(
"CHECK TABLE replicated_mt_1 PARTITION 201901",
settings={"check_query_single_value_result": 0},
settings={"check_query_single_value_result": 0, "max_threads": 1},
) == "{p}\t0\tPart {p} looks broken. Removing it and will try to fetch.\n".format(
p=part_name
)
@ -237,14 +262,14 @@ def test_check_replicated_table_corruption(started_cluster):
node1.query_with_retry("SYSTEM SYNC REPLICA replicated_mt_1")
assert node1.query(
"CHECK TABLE replicated_mt_1 PARTITION 201901",
settings={"check_query_single_value_result": 0},
settings={"check_query_single_value_result": 0, "max_threads": 1},
) == "{}\t1\t\n".format(part_name)
assert node1.query("SELECT count() from replicated_mt_1") == "4\n"
remove_part_from_disk(node2, "replicated_mt_1", part_name)
assert node2.query(
"CHECK TABLE replicated_mt_1 PARTITION 201901",
settings={"check_query_single_value_result": 0},
settings={"check_query_single_value_result": 0, "max_threads": 1},
) == "{p}\t0\tPart {p} looks broken. Removing it and will try to fetch.\n".format(
p=part_name
)
@ -252,6 +277,6 @@ def test_check_replicated_table_corruption(started_cluster):
node1.query("SYSTEM SYNC REPLICA replicated_mt_1")
assert node1.query(
"CHECK TABLE replicated_mt_1 PARTITION 201901",
settings={"check_query_single_value_result": 0},
settings={"check_query_single_value_result": 0, "max_threads": 1},
) == "{}\t1\t\n".format(part_name)
assert node1.query("SELECT count() from replicated_mt_1") == "4\n"

View File

@ -1,2 +1,3 @@
1
1
1

View File

@ -8,6 +8,12 @@ INSERT INTO check_query_tiny_log VALUES (1, 'A'), (2, 'B'), (3, 'C');
CHECK TABLE check_query_tiny_log;
CHECK TABLE check_query_tiny_log PARTITION tuple(); -- { serverError NOT_IMPLEMENTED }
CHECK TABLE check_query_tiny_log PART 'all_0_0_0'; -- { serverError NOT_IMPLEMENTED }
-- Settings and FORMAT are supported
CHECK TABLE check_query_tiny_log SETTINGS max_threads = 16;
CHECK TABLE check_query_tiny_log FORMAT Null SETTINGS max_threads = 8, check_query_single_value_result = 0;
DROP TABLE IF EXISTS check_query_log;

View File

@ -1,11 +1,17 @@
201901_1_1_0 1
========
201901_1_1_0 1
201801_1_1_0 1
201901_2_2_0 1
========
201901_1_2_1 1
201801_1_1_0 1
201901_2_2_0 1
201901_3_3_0 1
========
201901_1_2_1 1
201902_3_3_0 1
201801_1_1_1 1
201901_2_3_1 1
========
201902_3_4_1 1
201801_1_1_1 1
201901_2_3_1 1
201902_4_4_0 1
========
201902_4_5_1 1
========
201801_1_1_0 1

View File

@ -3,29 +3,31 @@ DROP TABLE IF EXISTS mt_table;
CREATE TABLE mt_table (d Date, key UInt64, data String) ENGINE = MergeTree() PARTITION BY toYYYYMM(d) ORDER BY key;
CHECK TABLE mt_table;
CHECK TABLE mt_table SETTINGS max_threads = 1;
INSERT INTO mt_table VALUES (toDate('2018-01-01'), 1, 'old');
INSERT INTO mt_table VALUES (toDate('2019-01-02'), 1, 'Hello'), (toDate('2019-01-02'), 2, 'World');
CHECK TABLE mt_table;
CHECK TABLE mt_table SETTINGS max_threads = 1;
INSERT INTO mt_table VALUES (toDate('2019-01-02'), 3, 'quick'), (toDate('2019-01-02'), 4, 'brown');
SELECT '========';
CHECK TABLE mt_table;
CHECK TABLE mt_table SETTINGS max_threads = 1;
OPTIMIZE TABLE mt_table FINAL;
SELECT '========';
CHECK TABLE mt_table;
CHECK TABLE mt_table SETTINGS max_threads = 1;
SELECT '========';
INSERT INTO mt_table VALUES (toDate('2019-02-03'), 5, '!'), (toDate('2019-02-03'), 6, '?');
CHECK TABLE mt_table;
CHECK TABLE mt_table SETTINGS max_threads = 1;
SELECT '========';
@ -33,6 +35,10 @@ INSERT INTO mt_table VALUES (toDate('2019-02-03'), 7, 'jump'), (toDate('2019-02-
OPTIMIZE TABLE mt_table FINAL;
CHECK TABLE mt_table PARTITION 201902;
CHECK TABLE mt_table PARTITION 201902 SETTINGS max_threads = 1;
SELECT '========';
CHECK TABLE mt_table PART '201801_1_1_0';
DROP TABLE IF EXISTS mt_table;

View File

@ -10,7 +10,7 @@ CREATE TABLE mt_without_pk (SomeField1 Int64, SomeField2 Double) ENGINE = MergeT
INSERT INTO mt_without_pk VALUES (1, 2);
CHECK TABLE mt_without_pk;
CHECK TABLE mt_without_pk SETTINGS max_threads = 1;
DROP TABLE IF EXISTS mt_without_pk SYNC;
@ -20,6 +20,6 @@ CREATE TABLE replicated_mt_without_pk (SomeField1 Int64, SomeField2 Double) ENGI
INSERT INTO replicated_mt_without_pk VALUES (1, 2);
CHECK TABLE replicated_mt_without_pk;
CHECK TABLE replicated_mt_without_pk SETTINGS max_threads = 1;
DROP TABLE IF EXISTS replicated_mt_without_pk SYNC;

View File

@ -7,11 +7,11 @@ CREATE TABLE check_query_test (SomeKey UInt64, SomeValue String) ENGINE = MergeT
-- Rows in this table are short, so granularity will be 8192.
INSERT INTO check_query_test SELECT number, toString(number) FROM system.numbers LIMIT 81920;
CHECK TABLE check_query_test;
CHECK TABLE check_query_test SETTINGS max_threads = 1;
OPTIMIZE TABLE check_query_test;
CHECK TABLE check_query_test;
CHECK TABLE check_query_test SETTINGS max_threads = 1;
DROP TABLE IF EXISTS check_query_test;
@ -21,18 +21,18 @@ CREATE TABLE check_query_test_non_adaptive (SomeKey UInt64, SomeValue String) EN
INSERT INTO check_query_test_non_adaptive SELECT number, toString(number) FROM system.numbers LIMIT 81920;
CHECK TABLE check_query_test_non_adaptive;
CHECK TABLE check_query_test_non_adaptive SETTINGS max_threads = 1;
OPTIMIZE TABLE check_query_test_non_adaptive;
CHECK TABLE check_query_test_non_adaptive;
CHECK TABLE check_query_test_non_adaptive SETTINGS max_threads = 1;
INSERT INTO check_query_test_non_adaptive SELECT number, toString(number) FROM system.numbers LIMIT 77;
CHECK TABLE check_query_test_non_adaptive;
CHECK TABLE check_query_test_non_adaptive SETTINGS max_threads = 1;
OPTIMIZE TABLE check_query_test_non_adaptive;
CHECK TABLE check_query_test_non_adaptive;
CHECK TABLE check_query_test_non_adaptive SETTINGS max_threads = 1;
DROP TABLE IF EXISTS check_query_test_non_adaptive;

View File

@ -10,6 +10,6 @@ CREATE TABLE check_table_with_indices (
INSERT INTO check_table_with_indices VALUES (0, 'test'), (1, 'test2');
CHECK TABLE check_table_with_indices;
CHECK TABLE check_table_with_indices SETTINGS max_threads = 1;
DROP TABLE check_table_with_indices;

View File

@ -4,12 +4,12 @@ DROP TABLE IF EXISTS check_codec;
CREATE TABLE check_codec(a Int, b Int CODEC(Delta, ZSTD)) ENGINE = MergeTree ORDER BY a SETTINGS min_bytes_for_wide_part = 0;
INSERT INTO check_codec SELECT number, number * 2 FROM numbers(1000);
CHECK TABLE check_codec;
CHECK TABLE check_codec SETTINGS max_threads = 1;
DROP TABLE check_codec;
CREATE TABLE check_codec(a Int, b Int CODEC(Delta, ZSTD)) ENGINE = MergeTree ORDER BY a SETTINGS min_bytes_for_wide_part = '10M';
INSERT INTO check_codec SELECT number, number * 2 FROM numbers(1000);
CHECK TABLE check_codec;
CHECK TABLE check_codec SETTINGS max_threads = 1;
DROP TABLE check_codec;

View File

@ -5,7 +5,7 @@ create table tp (x Int32, y Int32, projection p (select x, y order by x)) engine
insert into tp select number, number from numbers(3);
insert into tp select number, number from numbers(5);
check table tp settings check_query_single_value_result=0;
check table tp settings check_query_single_value_result=0, max_threads=1;
drop table tp;
@ -13,7 +13,7 @@ create table tp (p Date, k UInt64, v1 UInt64, v2 Int64, projection p1 ( select p
insert into tp (p, k, v1, v2) values ('2018-05-15', 1, 1000, 2000), ('2018-05-16', 2, 3000, 4000), ('2018-05-17', 3, 5000, 6000), ('2018-05-18', 4, 7000, 8000);
check table tp settings check_query_single_value_result=0;
check table tp settings check_query_single_value_result=0, max_threads=1;
drop table tp;
@ -22,5 +22,5 @@ create table tp (x int, projection p (select sum(x))) engine = MergeTree order b
insert into tp values (1), (2), (3), (4);
select part_type from system.parts where database = currentDatabase() and table = 'tp';
select part_type from system.projection_parts where database = currentDatabase() and table = 'tp';
check table tp settings check_query_single_value_result=0;
check table tp settings check_query_single_value_result=0, max_threads=1;
drop table tp;

View File

@ -12,7 +12,6 @@ SELECT name, column, serialization_kind FROM system.parts_columns
WHERE database = currentDatabase() AND table = 't_sparse_02235'
ORDER BY name, column;
SET check_query_single_value_result = 0;
CHECK TABLE t_sparse_02235;
CHECK TABLE t_sparse_02235 SETTINGS check_query_single_value_result = 0, max_threads = 1;
DROP TABLE t_sparse_02235;

View File

@ -14,7 +14,7 @@ INSERT INTO t_source_part_is_intact SELECT
if (number % 11 = 0, number, 0)
FROM numbers(2000);
CHECK TABLE t_source_part_is_intact;
CHECK TABLE t_source_part_is_intact SETTINGS max_threads = 1;
SELECT 1, count() FROM t_source_part_is_intact;
BEGIN TRANSACTION;
@ -22,18 +22,18 @@ BEGIN TRANSACTION;
ALTER TABLE t_source_part_is_intact update u = 0 where u != 0;
ROLLBACK;
CHECK TABLE t_source_part_is_intact;
CHECK TABLE t_source_part_is_intact SETTINGS max_threads = 1;
BEGIN TRANSACTION;
-- size of the file serialization.json is different in the new part
ALTER TABLE t_source_part_is_intact update u = 1 WHERE 1;
ROLLBACK;
CHECK TABLE t_source_part_is_intact;
CHECK TABLE t_source_part_is_intact SETTINGS max_threads = 1;
DETACH TABLE t_source_part_is_intact;
ATTACH TABLE t_source_part_is_intact;
CHECK TABLE t_source_part_is_intact;
CHECK TABLE t_source_part_is_intact SETTINGS max_threads = 1;
DROP TABLE t_source_part_is_intact;

View File

@ -0,0 +1,2 @@
Ok
Ok

View File

@ -0,0 +1,29 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
${CLICKHOUSE_CLIENT} -q "DROP TABLE IF EXISTS t0";
${CLICKHOUSE_CLIENT} -q "CREATE TABLE t0 (x UInt64, val String) ENGINE = MergeTree ORDER BY x PARTITION BY x % 100";
${CLICKHOUSE_CLIENT} -q "INSERT INTO t0 SELECT sipHash64(number), randomPrintableASCII(1000) FROM numbers(1000)";
# Check that we have at least 3 different values for read_rows
UNIQUE_VALUES=$(
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&send_progress_in_http_headers=1&http_headers_progress_interval_ms=0" -d @- <<< "CHECK TABLE t0" -v |& {
grep -F -e X-ClickHouse-Progress: -e X-ClickHouse-Summary: | grep -o '"read_rows"\s*:\s*"[0-9]*"'
} | uniq | wc -l
)
[ "$UNIQUE_VALUES" -ge "3" ] && echo "Ok" || echo "Fail: got $UNIQUE_VALUES"
# Check that we have we have at least 100 total_rows_to_read (at least one check task per partition)
MAX_TOTAL_VALUE=$(
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&send_progress_in_http_headers=1&http_headers_progress_interval_ms=0" -d @- <<< "CHECK TABLE t0" -v |& {
grep -F -e X-ClickHouse-Progress: -e X-ClickHouse-Summary: | grep -o '"total_rows_to_read"\s*:\s*"[0-9]*"' | grep -o '[0-9]*'
} | sort -n | tail -1
)
[ "$MAX_TOTAL_VALUE" -ge "100" ] && echo "Ok" || echo "Fail: got $MAX_TOTAL_VALUE"

View File

@ -0,0 +1 @@
121

View File

@ -0,0 +1,6 @@
-- Tags: use-rocksdb
CREATE TABLE dict (key UInt64, value String) ENGINE = EmbeddedRocksDB PRIMARY KEY key;
INSERT INTO dict SELECT number, toString(number) FROM numbers(121);
SELECT count() FROM dict SETTINGS optimize_trivial_approximate_count_query = 0, max_rows_to_read = 1; -- { serverError TOO_MANY_ROWS }
SELECT count() FROM dict SETTINGS optimize_trivial_approximate_count_query = 1, max_rows_to_read = 1;