Merge remote-tracking branch 'origin/master' into use-iobject-storage-for-table-engines-1

This commit is contained in:
kssenii 2024-05-22 14:13:27 +02:00
commit 1059b51177
40 changed files with 592 additions and 159 deletions

View File

@ -68,8 +68,9 @@ if (ENABLE_CHECK_HEAVY_BUILDS)
set (RLIMIT_AS 20000000000)
endif()
# For some files currently building RISCV64 might be too slow. TODO: Improve compilation times per file
if (ARCH_RISCV64)
# For some files currently building RISCV64/LOONGARCH64 might be too slow.
# TODO: Improve compilation times per file
if (ARCH_RISCV64 OR ARCH_LOONGARCH64)
set (RLIMIT_CPU 1800)
endif()

View File

@ -998,17 +998,170 @@ SELECT version()
Returns the build ID generated by a compiler for the running ClickHouse server binary.
If executed in the context of a distributed table, this function generates a normal column with values relevant to each shard. Otherwise it produces a constant value.
## blockNumber()
## blockNumber
Returns the sequence number of the data block where the row is located.
Returns a monotonically increasing sequence number of the [block](../../development/architecture.md#block) containing the row.
The returned block number is updated on a best-effort basis, i.e. it may not be fully accurate.
## rowNumberInBlock() {#rowNumberInBlock}
**Syntax**
```sql
blockNumber()
```
**Returned value**
- Sequence number of the data block where the row is located. [UInt64](../data-types/int-uint.md).
**Example**
Query:
```sql
SELECT blockNumber()
FROM
(
SELECT *
FROM system.numbers
LIMIT 10
) SETTINGS max_block_size = 2
```
Result:
```response
┌─blockNumber()─┐
│ 7 │
│ 7 │
└───────────────┘
┌─blockNumber()─┐
│ 8 │
│ 8 │
└───────────────┘
┌─blockNumber()─┐
│ 9 │
│ 9 │
└───────────────┘
┌─blockNumber()─┐
│ 10 │
│ 10 │
└───────────────┘
┌─blockNumber()─┐
│ 11 │
│ 11 │
└───────────────┘
```
## rowNumberInBlock {#rowNumberInBlock}
Returns for each [block](../../development/architecture.md#block) processed by `rowNumberInBlock` the number of the current row.
The returned number starts for each block at 0.
**Syntax**
```sql
rowNumberInBlock()
```
**Returned value**
- Ordinal number of the row in the data block starting from 0. [UInt64](../data-types/int-uint.md).
**Example**
Query:
```sql
SELECT rowNumberInBlock()
FROM
(
SELECT *
FROM system.numbers_mt
LIMIT 10
) SETTINGS max_block_size = 2
```
Result:
```response
┌─rowNumberInBlock()─┐
│ 0 │
│ 1 │
└────────────────────┘
┌─rowNumberInBlock()─┐
│ 0 │
│ 1 │
└────────────────────┘
┌─rowNumberInBlock()─┐
│ 0 │
│ 1 │
└────────────────────┘
┌─rowNumberInBlock()─┐
│ 0 │
│ 1 │
└────────────────────┘
┌─rowNumberInBlock()─┐
│ 0 │
│ 1 │
└────────────────────┘
```
## rowNumberInAllBlocks
Returns a unique row number for each row processed by `rowNumberInAllBlocks`. The returned numbers start at 0.
**Syntax**
```sql
rowNumberInAllBlocks()
```
**Returned value**
- Ordinal number of the row in the data block starting from 0. [UInt64](../data-types/int-uint.md).
**Example**
Query:
```sql
SELECT rowNumberInAllBlocks()
FROM
(
SELECT *
FROM system.numbers_mt
LIMIT 10
)
SETTINGS max_block_size = 2
```
Result:
```response
┌─rowNumberInAllBlocks()─┐
│ 0 │
│ 1 │
└────────────────────────┘
┌─rowNumberInAllBlocks()─┐
│ 4 │
│ 5 │
└────────────────────────┘
┌─rowNumberInAllBlocks()─┐
│ 2 │
│ 3 │
└────────────────────────┘
┌─rowNumberInAllBlocks()─┐
│ 6 │
│ 7 │
└────────────────────────┘
┌─rowNumberInAllBlocks()─┐
│ 8 │
│ 9 │
└────────────────────────┘
```
Returns the ordinal number of the row in the data block. Different data blocks are always recalculated.
## rowNumberInAllBlocks()
Returns the ordinal number of the row in the data block. This function only considers the affected data blocks.
## neighbor

View File

@ -14,7 +14,7 @@ struct Settings;
namespace ErrorCodes
{
extern const int CORRUPTED_DATA;
extern const int INCORRECT_DATA;
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int LOGICAL_ERROR;
}
@ -198,7 +198,7 @@ public:
this->data(place).value().read(buf, *serialization_val, arena);
if (unlikely(this->data(place).value().has() != this->data(place).result().has()))
throw Exception(
ErrorCodes::CORRUPTED_DATA,
ErrorCodes::INCORRECT_DATA,
"Invalid state of the aggregate function {}: has_value ({}) != has_result ({})",
getName(),
this->data(place).value().has(),

View File

@ -42,7 +42,7 @@ private:
return;
const auto & storage = table_node ? table_node->getStorage() : table_function_node->getStorage();
bool is_final_supported = storage && storage->supportsFinal();
bool is_final_supported = storage && !storage->isRemote() && storage->supportsFinal();
if (!is_final_supported)
return;

View File

@ -192,7 +192,7 @@ void QueryTreePassManager::run(QueryTreeNodePtr query_tree_node)
void QueryTreePassManager::runOnlyResolve(QueryTreeNodePtr query_tree_node)
{
// Run only QueryAnalysisPass and GroupingFunctionsResolvePass passes.
run(query_tree_node, 2);
run(query_tree_node, 3);
}
void QueryTreePassManager::run(QueryTreeNodePtr query_tree_node, size_t up_to_pass_index)
@ -249,6 +249,7 @@ void addQueryTreePasses(QueryTreePassManager & manager, bool only_analyze)
{
manager.addPass(std::make_unique<QueryAnalysisPass>(only_analyze));
manager.addPass(std::make_unique<GroupingFunctionsResolvePass>());
manager.addPass(std::make_unique<AutoFinalOnQueryPass>());
manager.addPass(std::make_unique<RemoveUnusedProjectionColumnsPass>());
manager.addPass(std::make_unique<FunctionToSubcolumnsPass>());
@ -294,7 +295,6 @@ void addQueryTreePasses(QueryTreePassManager & manager, bool only_analyze)
manager.addPass(std::make_unique<LogicalExpressionOptimizerPass>());
manager.addPass(std::make_unique<AutoFinalOnQueryPass>());
manager.addPass(std::make_unique<CrossToInnerJoinPass>());
manager.addPass(std::make_unique<ShardNumColumnToFunctionPass>());

View File

@ -80,6 +80,7 @@ class IColumn;
M(UInt64, connections_with_failover_max_tries, 3, "The maximum number of attempts to connect to replicas.", 0) \
M(UInt64, s3_strict_upload_part_size, 0, "The exact size of part to upload during multipart upload to S3 (some implementations does not supports variable size parts).", 0) \
M(UInt64, azure_strict_upload_part_size, 0, "The exact size of part to upload during multipart upload to Azure blob storage.", 0) \
M(UInt64, azure_max_blocks_in_multipart_upload, 50000, "Maximum number of blocks in multipart upload for Azure.", 0) \
M(UInt64, s3_min_upload_part_size, 16*1024*1024, "The minimum size of part to upload during multipart upload to S3.", 0) \
M(UInt64, s3_max_upload_part_size, 5ull*1024*1024*1024, "The maximum size of part to upload during multipart upload to S3.", 0) \
M(UInt64, azure_min_upload_part_size, 16*1024*1024, "The minimum size of part to upload during multipart upload to Azure blob storage.", 0) \

View File

@ -100,6 +100,7 @@ static std::map<ClickHouseVersion, SettingsChangesHistory::SettingsChanges> sett
{"azure_ignore_file_doesnt_exist", false, false, "Allow to return 0 rows when the requested files don't exist instead of throwing an exception in AzureBlobStorage table engine"},
{"s3_ignore_file_doesnt_exist", false, false, "Allow to return 0 rows when the requested files don't exist instead of throwing an exception in S3 table engine"},
{"input_format_force_null_for_omitted_fields", false, false, "Disable type-defaults for omitted fields when needed"},
{"azure_max_blocks_in_multipart_upload", 50000, 50000, "Maximum number of blocks in multipart upload for Azure."},
}},
{"24.4", {{"input_format_json_throw_on_bad_escape_sequence", true, true, "Allow to save JSON strings with bad escape sequences"},
{"max_parsing_threads", 0, 0, "Add a separate setting to control number of threads in parallel parsing from files"},

View File

@ -92,7 +92,7 @@ void applySettingsQuirks(Settings & settings, LoggerPtr log)
void doSettingsSanityCheckClamp(Settings & current_settings, LoggerPtr log)
{
auto getCurrentValue = [&current_settings](const std::string_view name) -> Field
auto get_current_value = [&current_settings](const std::string_view name) -> Field
{
Field current_value;
bool has_current_value = current_settings.tryGet(name, current_value);
@ -100,7 +100,7 @@ void doSettingsSanityCheckClamp(Settings & current_settings, LoggerPtr log)
return current_value;
};
UInt64 max_threads = getCurrentValue("max_threads").get<UInt64>();
UInt64 max_threads = get_current_value("max_threads").get<UInt64>();
UInt64 max_threads_max_value = 256 * getNumberOfPhysicalCPUCores();
if (max_threads > max_threads_max_value)
{
@ -109,7 +109,7 @@ void doSettingsSanityCheckClamp(Settings & current_settings, LoggerPtr log)
current_settings.set("max_threads", max_threads_max_value);
}
constexpr UInt64 max_sane_block_rows_size = 4294967296; // 2^32
static constexpr UInt64 max_sane_block_rows_size = 4294967296; // 2^32
std::unordered_set<String> block_rows_settings{
"max_block_size",
"max_insert_block_size",
@ -120,13 +120,21 @@ void doSettingsSanityCheckClamp(Settings & current_settings, LoggerPtr log)
"input_format_parquet_max_block_size"};
for (auto const & setting : block_rows_settings)
{
auto block_size = getCurrentValue(setting).get<UInt64>();
if (block_size > max_sane_block_rows_size)
if (auto block_size = get_current_value(setting).get<UInt64>();
block_size > max_sane_block_rows_size)
{
if (log)
LOG_WARNING(log, "Sanity check: '{}' value is too high ({}). Reduced to {}", setting, block_size, max_sane_block_rows_size);
current_settings.set(setting, max_sane_block_rows_size);
}
}
if (auto max_block_size = get_current_value("max_block_size").get<UInt64>(); max_block_size == 0)
{
if (log)
LOG_WARNING(log, "Sanity check: 'max_block_size' cannot be 0. Set to default value {}", DEFAULT_BLOCK_SIZE);
current_settings.set("max_block_size", DEFAULT_BLOCK_SIZE);
}
}
}

View File

@ -76,6 +76,20 @@ static void setReplicatedEngine(ASTCreateQuery * create_query, ContextPtr contex
String replica_path = server_settings.default_replica_path;
String replica_name = server_settings.default_replica_name;
/// Check that replica path doesn't exist
Macros::MacroExpansionInfo info;
StorageID table_id = StorageID(create_query->getDatabase(), create_query->getTable(), create_query->uuid);
info.table_id = table_id;
info.expand_special_macros_only = false;
String zookeeper_path = context->getMacros()->expand(replica_path, info);
if (context->getZooKeeper()->exists(zookeeper_path))
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Found existing ZooKeeper path {} while trying to convert table {} to replicated. Table will not be converted.",
zookeeper_path, backQuote(table_id.getFullTableName())
);
auto args = std::make_shared<ASTExpressionList>();
args->children.push_back(std::make_shared<ASTLiteral>(replica_path));
args->children.push_back(std::make_shared<ASTLiteral>(replica_name));

View File

@ -721,11 +721,10 @@ public:
if (!block.checkCheckSum())
{
std::string calculated_check_sum = std::to_string(block.calculateCheckSum());
std::string check_sum = std::to_string(block.getCheckSum());
std::string expected_check_sum = std::to_string(block.getCheckSum());
throw Exception(ErrorCodes::CORRUPTED_DATA,
"Cache data corrupted. Checksum validation failed. Calculated {} in block {}",
calculated_check_sum,
check_sum);
"Cache data corrupted. Checksum validation failed. Calculated {} expected in block {}, in file {}",
calculated_check_sum, expected_check_sum, file_path);
}
func(blocks_to_fetch[block_to_fetch_index], block.getBlockData());

View File

@ -257,6 +257,7 @@ std::unique_ptr<AzureObjectStorageSettings> getAzureBlobStorageSettings(const Po
settings->max_upload_part_size = config.getUInt64(config_prefix + ".max_upload_part_size", context->getSettings().azure_max_upload_part_size);
settings->max_single_part_copy_size = config.getUInt64(config_prefix + ".max_single_part_copy_size", context->getSettings().azure_max_single_part_copy_size);
settings->use_native_copy = config.getBool(config_prefix + ".use_native_copy", false);
settings->max_blocks_in_multipart_upload = config.getUInt64(config_prefix + ".max_blocks_in_multipart_upload", 50000);
settings->max_unexpected_write_error_retries = config.getUInt64(config_prefix + ".max_unexpected_write_error_retries", context->getSettings().azure_max_unexpected_write_error_retries);
settings->max_inflight_parts_for_one_file = config.getUInt64(config_prefix + ".max_inflight_parts_for_one_file", context->getSettings().azure_max_inflight_parts_for_one_file);
settings->strict_upload_part_size = config.getUInt64(config_prefix + ".strict_upload_part_size", context->getSettings().azure_strict_upload_part_size);

View File

@ -63,6 +63,7 @@ struct AzureObjectStorageSettings
bool use_native_copy = false;
size_t max_unexpected_write_error_retries = 4;
size_t max_inflight_parts_for_one_file = 20;
size_t max_blocks_in_multipart_upload = 50000;
size_t strict_upload_part_size = 0;
size_t upload_part_size_multiply_factor = 2;
size_t upload_part_size_multiply_parts_count_threshold = 500;

View File

@ -305,7 +305,6 @@ void registerAzureObjectStorage(ObjectStorageFactory & factory)
bool /* skip_access_check */) -> ObjectStoragePtr
{
AzureBlobStorageEndpoint endpoint = processAzureBlobStorageEndpoint(config, config_prefix);
std::string endpoint_string = endpoint.getEndpoint();
return createObjectStorage<AzureObjectStorage>(
ObjectStorageType::Azure, config, config_prefix, name,

View File

@ -31,7 +31,7 @@ extract_into_parent_list(clickhouse_functions_headers dbms_headers
add_library(clickhouse_functions_obj OBJECT ${clickhouse_functions_headers} ${clickhouse_functions_sources})
if (OMIT_HEAVY_DEBUG_SYMBOLS)
target_compile_options(clickhouse_functions_obj PRIVATE "-g0")
set_source_files_properties(${DBMS_FUNCTIONS} PROPERTIES COMPILE_FLAGS "-g0")
set_source_files_properties(${DBMS_FUNCTIONS} DIRECTORY .. PROPERTIES COMPILE_FLAGS "-g0")
endif()
list (APPEND OBJECT_LIBS $<TARGET_OBJECTS:clickhouse_functions_obj>)

View File

@ -30,6 +30,7 @@ namespace ErrorCodes
{
extern const int INVALID_CONFIG_PARAMETER;
extern const int AZURE_BLOB_STORAGE_ERROR;
extern const int LOGICAL_ERROR;
}
namespace
@ -94,11 +95,56 @@ namespace
void calculatePartSize()
{
auto max_upload_part_size = settings->max_upload_part_size;
if (!max_upload_part_size)
throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER, "max_upload_part_size must not be 0");
if (!total_size)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Chosen multipart upload for an empty file. This must not happen");
auto max_part_number = settings->max_blocks_in_multipart_upload;
const auto min_upload_part_size = settings->min_upload_part_size;
const auto max_upload_part_size = settings->max_upload_part_size;
if (!max_part_number)
throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER, "max_blocks_in_multipart_upload must not be 0");
else if (!min_upload_part_size)
throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER, "min_upload_part_size must not be 0");
else if (max_upload_part_size < min_upload_part_size)
throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER, "max_upload_part_size must not be less than min_upload_part_size");
size_t part_size = min_upload_part_size;
auto num_parts = (total_size + part_size - 1) / part_size;
if (num_parts > max_part_number)
{
part_size = (total_size + max_part_number - 1) / max_part_number;
num_parts = (total_size + part_size - 1) / part_size;
}
if (part_size > max_upload_part_size)
{
part_size = max_upload_part_size;
num_parts = (total_size + part_size - 1) / part_size;
}
String error;
if (num_parts < 1)
error = "Number of parts is zero";
else if (num_parts > max_part_number)
error = fmt::format("Number of parts exceeds {}/{}", num_parts, max_part_number);
else if (part_size < min_upload_part_size)
error = fmt::format("Size of a part is less than {}/{}", part_size, min_upload_part_size);
else if (part_size > max_upload_part_size)
error = fmt::format("Size of a part exceeds {}/{}", part_size, max_upload_part_size);
if (!error.empty())
{
throw Exception(
ErrorCodes::INVALID_CONFIG_PARAMETER,
"{} while writing {} bytes to Azure. Check max_part_number = {}, "
"min_upload_part_size = {}, max_upload_part_size = {}",
error, total_size, max_part_number, min_upload_part_size, max_upload_part_size);
}
/// We've calculated the size of a normal part (the final part can be smaller).
normal_part_size = max_upload_part_size;
normal_part_size = part_size;
}
public:
@ -219,21 +265,22 @@ namespace
auto block_blob_client = client->GetBlockBlobClient(dest_blob);
auto read_buffer = std::make_unique<LimitSeekableReadBuffer>(create_read_buffer(), task.part_offset, task.part_size);
while (!read_buffer->eof())
{
auto size = read_buffer->available();
if (size > 0)
{
auto block_id = getRandomASCIIString(64);
Azure::Core::IO::MemoryBodyStream memory(reinterpret_cast<const uint8_t *>(read_buffer->position()), size);
block_blob_client.StageBlock(block_id, memory);
task.block_ids.emplace_back(block_id);
read_buffer->ignore(size);
LOG_TRACE(log, "Writing part. Container: {}, Blob: {}, block_id: {}", dest_container_for_logging, dest_blob, block_id);
}
}
std::lock_guard lock(bg_tasks_mutex); /// Protect bg_tasks from race
LOG_TRACE(log, "Writing part finished. Container: {}, Blob: {}, Parts: {}", dest_container_for_logging, dest_blob, bg_tasks.size());
/// task.part_size is already normalized according to min_upload_part_size and max_upload_part_size.
size_t size_to_stage = task.part_size;
PODArray<char> memory;
memory.resize(size_to_stage);
WriteBufferFromVector<PODArray<char>> wb(memory);
copyData(*read_buffer, wb, size_to_stage);
Azure::Core::IO::MemoryBodyStream stream(reinterpret_cast<const uint8_t *>(memory.data()), size_to_stage);
const auto & block_id = task.block_ids.emplace_back(getRandomASCIIString(64));
block_blob_client.StageBlock(block_id, stream);
LOG_TRACE(log, "Writing part. Container: {}, Blob: {}, block_id: {}, size: {}",
dest_container_for_logging, dest_blob, block_id, size_to_stage);
}
@ -300,21 +347,32 @@ void copyAzureBlobStorageFile(
if (size < settings->max_single_part_copy_size)
{
LOG_TRACE(getLogger("copyAzureBlobStorageFile"), "Copy blob sync {} -> {}", src_blob, dest_blob);
block_blob_client_dest.CopyFromUri(source_uri);
}
else
{
Azure::Storage::Blobs::StartBlobCopyOperation operation = block_blob_client_dest.StartCopyFromUri(source_uri);
// Wait for the operation to finish, checking for status every 100 second.
auto copy_response = operation.PollUntilDone(std::chrono::milliseconds(100));
auto properties_model = copy_response.Value;
if (properties_model.CopySource.HasValue())
{
throw Exception(ErrorCodes::AZURE_BLOB_STORAGE_ERROR, "Copy failed");
}
auto copy_status = properties_model.CopyStatus;
auto copy_status_description = properties_model.CopyStatusDescription;
if (copy_status.HasValue() && copy_status.Value() == Azure::Storage::Blobs::Models::CopyStatus::Success)
{
LOG_TRACE(getLogger("copyAzureBlobStorageFile"), "Copy of {} to {} finished", properties_model.CopySource.Value(), dest_blob);
}
else
{
if (copy_status.HasValue())
throw Exception(ErrorCodes::AZURE_BLOB_STORAGE_ERROR, "Copy from {} to {} failed with status {} description {} (operation is done {})",
src_blob, dest_blob, copy_status.Value().ToString(), copy_status_description.Value(), operation.IsDone());
else
throw Exception(ErrorCodes::AZURE_BLOB_STORAGE_ERROR, "Copy from {} to {} didn't complete with success status (operation is done {})", src_blob, dest_blob, operation.IsDone());
}
}
}
else
@ -322,8 +380,8 @@ void copyAzureBlobStorageFile(
LOG_TRACE(&Poco::Logger::get("copyAzureBlobStorageFile"), "Reading from Container: {}, Blob: {}", src_container_for_logging, src_blob);
auto create_read_buffer = [&]
{
return std::make_unique<ReadBufferFromAzureBlobStorage>(src_client, src_blob, read_settings, settings->max_single_read_retries,
settings->max_single_download_retries);
return std::make_unique<ReadBufferFromAzureBlobStorage>(
src_client, src_blob, read_settings, settings->max_single_read_retries, settings->max_single_download_retries);
};
UploadHelper helper{create_read_buffer, dest_client, offset, size, dest_container_for_logging, dest_blob, settings, schedule, &Poco::Logger::get("copyAzureBlobStorageFile")};

View File

@ -316,23 +316,23 @@ namespace
num_parts = (total_size + part_size - 1) / part_size;
}
if (num_parts < 1 || num_parts > max_part_number || part_size < min_upload_part_size || part_size > max_upload_part_size)
{
String msg;
if (num_parts < 1)
msg = "Number of parts is zero";
else if (num_parts > max_part_number)
msg = fmt::format("Number of parts exceeds {}", num_parts, max_part_number);
else if (part_size < min_upload_part_size)
msg = fmt::format("Size of a part is less than {}", part_size, min_upload_part_size);
else
msg = fmt::format("Size of a part exceeds {}", part_size, max_upload_part_size);
String error;
if (num_parts < 1)
error = "Number of parts is zero";
else if (num_parts > max_part_number)
error = fmt::format("Number of parts exceeds {}/{}", num_parts, max_part_number);
else if (part_size < min_upload_part_size)
error = fmt::format("Size of a part is less than {}/{}", part_size, min_upload_part_size);
else if (part_size > max_upload_part_size)
error = fmt::format("Size of a part exceeds {}/{}", part_size, max_upload_part_size);
if (!error.empty())
{
throw Exception(
ErrorCodes::INVALID_CONFIG_PARAMETER,
"{} while writing {} bytes to S3. Check max_part_number = {}, "
"min_upload_part_size = {}, max_upload_part_size = {}",
msg, total_size, max_part_number, min_upload_part_size, max_upload_part_size);
error, total_size, max_part_number, min_upload_part_size, max_upload_part_size);
}
/// We've calculated the size of a normal part (the final part can be smaller).

View File

@ -462,8 +462,8 @@ void ThreadStatus::initGlobalProfiler([[maybe_unused]] UInt64 global_profiler_re
{
#if !defined(SANITIZER) && !defined(__APPLE__)
/// profilers are useless without trace collector
auto global_context_ptr = global_context.lock();
if (!global_context_ptr || !global_context_ptr->hasTraceCollector())
auto context = Context::getGlobalContextInstance();
if (!context->hasTraceCollector())
return;
try

View File

@ -103,7 +103,6 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
extern const int NOT_IMPLEMENTED;
extern const int QUERY_WAS_CANCELLED;
extern const int INCORRECT_DATA;
extern const int SYNTAX_ERROR;
extern const int SUPPORT_IS_DISABLED;
extern const int INCORRECT_QUERY;
@ -1256,34 +1255,6 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
}
}
}
// Here we check if our our projections contain force_optimize_projection_name
if (!settings.force_optimize_projection_name.value.empty())
{
bool found = false;
std::set<std::string> projections;
{
const auto & access_info = context->getQueryAccessInfo();
std::lock_guard lock(access_info.mutex);
projections = access_info.projections;
}
for (const auto &projection : projections)
{
// projection value has structure like: <db_name>.<table_name>.<projection_name>
// We need to get only the projection name
size_t last_dot_pos = projection.find_last_of('.');
std::string projection_name = (last_dot_pos != std::string::npos) ? projection.substr(last_dot_pos + 1) : projection;
if (settings.force_optimize_projection_name.value == projection_name)
{
found = true;
break;
}
}
if (!found)
throw Exception(ErrorCodes::INCORRECT_DATA, "Projection {} is specified in setting force_optimize_projection_name but not used",
settings.force_optimize_projection_name.value);
}
if (process_list_entry)
{
@ -1421,7 +1392,16 @@ void executeQuery(
const char * begin;
const char * end;
istr.nextIfAtEnd();
try
{
istr.nextIfAtEnd();
}
catch (...)
{
/// If buffer contains invalid data and we failed to decompress, we still want to have some information about the query in the log.
logQuery("<cannot parse>", context, /* internal = */ false, QueryProcessingStage::Complete);
throw;
}
size_t max_query_size = context->getSettingsRef().max_query_size;

View File

@ -111,8 +111,11 @@ void optimizePrimaryKeyCondition(const Stack & stack);
void optimizePrewhere(Stack & stack, QueryPlan::Nodes & nodes);
void optimizeReadInOrder(QueryPlan::Node & node, QueryPlan::Nodes & nodes);
void optimizeAggregationInOrder(QueryPlan::Node & node, QueryPlan::Nodes &);
bool optimizeUseAggregateProjections(QueryPlan::Node & node, QueryPlan::Nodes & nodes, bool allow_implicit_projections);
bool optimizeUseNormalProjections(Stack & stack, QueryPlan::Nodes & nodes);
/// Returns the name of used projection or nullopt if no projection is used.
std::optional<String> optimizeUseAggregateProjections(QueryPlan::Node & node, QueryPlan::Nodes & nodes, bool allow_implicit_projections);
std::optional<String> optimizeUseNormalProjections(Stack & stack, QueryPlan::Nodes & nodes);
bool addPlansForSets(QueryPlan & plan, QueryPlan::Node & node, QueryPlan::Nodes & nodes);
/// Enable memory bound merging of aggregation states for remote queries

View File

@ -46,7 +46,7 @@ QueryPlanOptimizationSettings QueryPlanOptimizationSettings::fromSettings(const
settings.optimize_projection = from.optimize_use_projections;
settings.force_use_projection = settings.optimize_projection && from.force_optimize_projection;
settings.force_projection_name = from.force_optimize_projection_name;
settings.force_projection_name = settings.optimize_projection ? from.force_optimize_projection_name.value : "";
settings.optimize_use_implicit_projections = settings.optimize_projection && from.optimize_use_implicit_projections;
return settings;

View File

@ -12,6 +12,7 @@ namespace DB
namespace ErrorCodes
{
extern const int INCORRECT_DATA;
extern const int TOO_MANY_QUERY_PLAN_OPTIMIZATIONS;
extern const int PROJECTION_NOT_USED;
}
@ -106,7 +107,7 @@ void optimizeTreeFirstPass(const QueryPlanOptimizationSettings & settings, Query
void optimizeTreeSecondPass(const QueryPlanOptimizationSettings & optimization_settings, QueryPlan::Node & root, QueryPlan::Nodes & nodes)
{
const size_t max_optimizations_to_apply = optimization_settings.max_optimizations_to_apply;
size_t num_applied_projection = 0;
std::unordered_set<String> applied_projection_names;
bool has_reading_from_mt = false;
Stack stack;
@ -159,9 +160,11 @@ void optimizeTreeSecondPass(const QueryPlanOptimizationSettings & optimization_s
/// Projection optimization relies on PK optimization
if (optimization_settings.optimize_projection)
num_applied_projection
+= optimizeUseAggregateProjections(*frame.node, nodes, optimization_settings.optimize_use_implicit_projections);
{
auto applied_projection = optimizeUseAggregateProjections(*frame.node, nodes, optimization_settings.optimize_use_implicit_projections);
if (applied_projection)
applied_projection_names.insert(*applied_projection);
}
if (optimization_settings.aggregation_in_order)
optimizeAggregationInOrder(*frame.node, nodes);
@ -180,11 +183,11 @@ void optimizeTreeSecondPass(const QueryPlanOptimizationSettings & optimization_s
if (optimization_settings.optimize_projection)
{
/// Projection optimization relies on PK optimization
if (optimizeUseNormalProjections(stack, nodes))
if (auto applied_projection = optimizeUseNormalProjections(stack, nodes))
{
++num_applied_projection;
applied_projection_names.insert(*applied_projection);
if (max_optimizations_to_apply && max_optimizations_to_apply < num_applied_projection)
if (max_optimizations_to_apply && max_optimizations_to_apply < applied_projection_names.size())
throw Exception(ErrorCodes::TOO_MANY_QUERY_PLAN_OPTIMIZATIONS,
"Too many projection optimizations applied to query plan. Current limit {}",
max_optimizations_to_apply);
@ -201,10 +204,16 @@ void optimizeTreeSecondPass(const QueryPlanOptimizationSettings & optimization_s
stack.pop_back();
}
if (optimization_settings.force_use_projection && has_reading_from_mt && num_applied_projection == 0)
if (optimization_settings.force_use_projection && has_reading_from_mt && applied_projection_names.empty())
throw Exception(
ErrorCodes::PROJECTION_NOT_USED,
"No projection is used when optimize_use_projections = 1 and force_optimize_projection = 1");
if (!optimization_settings.force_projection_name.empty() && has_reading_from_mt && !applied_projection_names.contains(optimization_settings.force_projection_name))
throw Exception(
ErrorCodes::INCORRECT_DATA,
"Projection {} is specified in setting force_optimize_projection_name but not used",
optimization_settings.force_projection_name);
}
void optimizeTreeThirdPass(QueryPlan & plan, QueryPlan::Node & root, QueryPlan::Nodes & nodes)

View File

@ -552,28 +552,28 @@ static QueryPlan::Node * findReadingStep(QueryPlan::Node & node)
return nullptr;
}
bool optimizeUseAggregateProjections(QueryPlan::Node & node, QueryPlan::Nodes & nodes, bool allow_implicit_projections)
std::optional<String> optimizeUseAggregateProjections(QueryPlan::Node & node, QueryPlan::Nodes & nodes, bool allow_implicit_projections)
{
if (node.children.size() != 1)
return false;
return {};
auto * aggregating = typeid_cast<AggregatingStep *>(node.step.get());
if (!aggregating)
return false;
return {};
if (!aggregating->canUseProjection())
return false;
return {};
QueryPlan::Node * reading_node = findReadingStep(*node.children.front());
if (!reading_node)
return false;
return {};
auto * reading = typeid_cast<ReadFromMergeTree *>(reading_node->step.get());
if (!reading)
return false;
return {};
if (!canUseProjectionForReadingStep(reading))
return false;
return {};
std::shared_ptr<PartitionIdToMaxBlock> max_added_blocks = getMaxAddedBlocks(reading);
@ -597,7 +597,7 @@ bool optimizeUseAggregateProjections(QueryPlan::Node & node, QueryPlan::Nodes &
if (ordinary_reading_marks == 0)
{
reading->setAnalyzedResult(std::move(ordinary_reading_select_result));
return false;
return {};
}
const auto & parts_with_ranges = ordinary_reading_select_result->parts_with_ranges;
@ -631,15 +631,14 @@ bool optimizeUseAggregateProjections(QueryPlan::Node & node, QueryPlan::Nodes &
if (!best_candidate)
{
reading->setAnalyzedResult(std::move(ordinary_reading_select_result));
return false;
return {};
}
}
else
{
return false;
return {};
}
Context::QualifiedProjectionName projection_name;
chassert(best_candidate != nullptr);
QueryPlanStepPtr projection_reading;
@ -654,12 +653,6 @@ bool optimizeUseAggregateProjections(QueryPlan::Node & node, QueryPlan::Nodes &
Pipe pipe(std::make_shared<SourceFromSingleChunk>(std::move(candidates.minmax_projection->block)));
projection_reading = std::make_unique<ReadFromPreparedSource>(std::move(pipe));
has_ordinary_parts = false;
projection_name = Context::QualifiedProjectionName
{
.storage_id = reading->getMergeTreeData().getStorageID(),
.projection_name = candidates.minmax_projection->candidate.projection->name,
};
}
else
{
@ -691,12 +684,6 @@ bool optimizeUseAggregateProjections(QueryPlan::Node & node, QueryPlan::Nodes &
projection_reading = std::make_unique<ReadFromPreparedSource>(std::move(pipe));
}
projection_name = Context::QualifiedProjectionName
{
.storage_id = reading->getMergeTreeData().getStorageID(),
.projection_name = best_candidate->projection->name,
};
has_ordinary_parts = best_candidate->merge_tree_ordinary_select_result_ptr != nullptr;
if (has_ordinary_parts)
reading->setAnalyzedResult(std::move(best_candidate->merge_tree_ordinary_select_result_ptr));
@ -746,7 +733,7 @@ bool optimizeUseAggregateProjections(QueryPlan::Node & node, QueryPlan::Nodes &
node.children.push_back(&expr_or_filter_node);
}
return true;
return best_candidate->projection->name;
}
}

View File

@ -73,16 +73,16 @@ static bool hasAllRequiredColumns(const ProjectionDescription * projection, cons
}
bool optimizeUseNormalProjections(Stack & stack, QueryPlan::Nodes & nodes)
std::optional<String> optimizeUseNormalProjections(Stack & stack, QueryPlan::Nodes & nodes)
{
const auto & frame = stack.back();
auto * reading = typeid_cast<ReadFromMergeTree *>(frame.node->step.get());
if (!reading)
return false;
return {};
if (!canUseProjectionForReadingStep(reading))
return false;
return {};
auto iter = stack.rbegin();
while (std::next(iter) != stack.rend())
@ -96,7 +96,7 @@ bool optimizeUseNormalProjections(Stack & stack, QueryPlan::Nodes & nodes)
/// Dangling query plan node. This might be generated by StorageMerge.
if (iter->node->step.get() == reading)
return false;
return {};
const auto metadata = reading->getStorageMetadata();
const auto & projections = metadata->projections;
@ -107,7 +107,7 @@ bool optimizeUseNormalProjections(Stack & stack, QueryPlan::Nodes & nodes)
normal_projections.push_back(&projection);
if (normal_projections.empty())
return false;
return {};
ContextPtr context = reading->getContext();
auto it = std::find_if(normal_projections.begin(), normal_projections.end(), [&](const auto * projection)
@ -126,7 +126,7 @@ bool optimizeUseNormalProjections(Stack & stack, QueryPlan::Nodes & nodes)
{
auto & child = iter->node->children[iter->next_child - 1];
if (!query.build(*child))
return false;
return {};
if (query.dag)
query.dag->removeUnusedActions();
@ -146,7 +146,7 @@ bool optimizeUseNormalProjections(Stack & stack, QueryPlan::Nodes & nodes)
if (ordinary_reading_marks == 0)
{
reading->setAnalyzedResult(std::move(ordinary_reading_select_result));
return false;
return {};
}
const auto & parts_with_ranges = ordinary_reading_select_result->parts_with_ranges;
@ -185,7 +185,7 @@ bool optimizeUseNormalProjections(Stack & stack, QueryPlan::Nodes & nodes)
if (!best_candidate)
{
reading->setAnalyzedResult(std::move(ordinary_reading_select_result));
return false;
return {};
}
auto storage_snapshot = reading->getStorageSnapshot();
@ -283,8 +283,7 @@ bool optimizeUseNormalProjections(Stack & stack, QueryPlan::Nodes & nodes)
/// Here we remove last steps from stack to be able to optimize again.
/// In theory, read-in-order can be applied to projection.
stack.resize(iter.base() - stack.begin());
return true;
return best_candidate->projection->name;
}
}

View File

@ -707,11 +707,11 @@ void HTTPHandler::processQuery(
/// The data can also be compressed using incompatible internal algorithm. This is indicated by
/// 'decompress' query parameter.
std::unique_ptr<ReadBuffer> in_post_maybe_compressed;
bool in_post_compressed = false;
bool is_in_post_compressed = false;
if (params.getParsed<bool>("decompress", false))
{
in_post_maybe_compressed = std::make_unique<CompressedReadBuffer>(*in_post);
in_post_compressed = true;
in_post_maybe_compressed = std::make_unique<CompressedReadBuffer>(*in_post, /* allow_different_codecs_ = */ false, /* external_data_ = */ true);
is_in_post_compressed = true;
}
else
in_post_maybe_compressed = std::move(in_post);
@ -845,7 +845,7 @@ void HTTPHandler::processQuery(
/// If 'http_native_compression_disable_checksumming_on_decompress' setting is turned on,
/// checksums of client data compressed with internal algorithm are not checked.
if (in_post_compressed && settings.http_native_compression_disable_checksumming_on_decompress)
if (is_in_post_compressed && settings.http_native_compression_disable_checksumming_on_decompress)
static_cast<CompressedReadBuffer &>(*in_post_maybe_compressed).disableChecksumming();
/// Add CORS header if 'add_http_cors_header' setting is turned on send * in Access-Control-Allow-Origin

View File

@ -110,6 +110,11 @@ def cluster():
main_configs=[path],
with_azurite=True,
)
cluster.add_instance(
"node3",
main_configs=[path],
with_azurite=True,
)
cluster.start()
yield cluster
@ -216,3 +221,37 @@ def test_backup_restore_on_merge_tree_different_container(cluster):
azure_query(node2, f"DROP TABLE test_simple_merge_tree_different_bucket")
azure_query(node2, f"DROP TABLE test_simple_merge_tree_different_bucket_restored")
def test_backup_restore_on_merge_tree_native_copy_async(cluster):
node3 = cluster.instances["node3"]
azure_query(
node3,
f"CREATE TABLE test_simple_merge_tree_async(key UInt64, data String) Engine = MergeTree() ORDER BY tuple() SETTINGS storage_policy='policy_azure_cache'",
)
azure_query(node3, f"INSERT INTO test_simple_merge_tree_async VALUES (1, 'a')")
backup_destination = f"AzureBlobStorage('{cluster.env_variables['AZURITE_CONNECTION_STRING']}', 'cont', 'test_simple_merge_tree_async_backup')"
print("BACKUP DEST", backup_destination)
azure_query(
node3,
f"BACKUP TABLE test_simple_merge_tree_async TO {backup_destination}",
settings={"azure_max_single_part_copy_size": 0},
)
assert node3.contains_in_log("using native copy")
azure_query(
node3,
f"RESTORE TABLE test_simple_merge_tree_async AS test_simple_merge_tree_async_restored FROM {backup_destination};",
settings={"azure_max_single_part_copy_size": 0},
)
assert (
azure_query(node3, f"SELECT * from test_simple_merge_tree_async_restored")
== "1\ta\n"
)
assert node3.contains_in_log("using native copy")
azure_query(node3, f"DROP TABLE test_simple_merge_tree_async")
azure_query(node3, f"DROP TABLE test_simple_merge_tree_async_restored")

View File

@ -281,7 +281,10 @@ def test_backup_restore_on_merge_tree(cluster):
node = cluster.instances["node"]
azure_query(
node,
f"CREATE TABLE test_simple_merge_tree(key UInt64, data String) Engine = MergeTree() ORDER BY tuple() SETTINGS storage_policy='blob_storage_policy'",
f"""
DROP TABLE IF EXISTS test_simple_merge_tree;
CREATE TABLE test_simple_merge_tree(key UInt64, data String) Engine = MergeTree() ORDER BY tuple() SETTINGS storage_policy='blob_storage_policy'
""",
)
azure_query(node, f"INSERT INTO test_simple_merge_tree VALUES (1, 'a')")
@ -299,3 +302,85 @@ def test_backup_restore_on_merge_tree(cluster):
)
azure_query(node, f"DROP TABLE test_simple_merge_tree")
azure_query(node, f"DROP TABLE test_simple_merge_tree_restored")
def test_backup_restore_correct_block_ids(cluster):
node = cluster.instances["node"]
azure_query(
node,
f"""
DROP TABLE IF EXISTS test_simple_merge_tree;
CREATE TABLE test_simple_merge_tree(key UInt64, data String)
Engine = MergeTree()
ORDER BY tuple()
SETTINGS storage_policy='blob_storage_policy'""",
)
data_query = "SELECT number, repeat('a', 100) FROM numbers(1000)"
azure_query(
node,
f"INSERT INTO test_simple_merge_tree {data_query}",
)
for min_upload_size, max_upload_size, max_blocks, expected_block_size in [
(42, 100, 1000, 42),
(42, 52, 86, 52),
]:
data_path = f"test_backup_correct_block_ids_{max_blocks}"
backup_destination = f"AzureBlobStorage('{cluster.env_variables['AZURITE_CONNECTION_STRING']}', 'cont', '{data_path}')"
azure_query(
node,
f"""
SET azure_min_upload_part_size = {min_upload_size};
SET azure_max_upload_part_size = {max_upload_size};
SET azure_max_blocks_in_multipart_upload = {max_blocks};
BACKUP TABLE test_simple_merge_tree TO {backup_destination} SETTINGS allow_azure_native_copy = 0;
""",
)
port = cluster.azurite_port
connection_string = (
f"DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;"
f"AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;"
f"BlobEndpoint=http://127.0.0.1:{port}/devstoreaccount1;"
)
container_name = "cont"
blob_service_client = BlobServiceClient.from_connection_string(
connection_string
)
container_client = blob_service_client.get_container_client(container_name)
blobs = container_client.list_blobs()
data_blob = (
f"{data_path}/data/default/test_simple_merge_tree/all_1_1_0/data.bin"
)
found = False
for blob in blobs:
if data_blob == blob.get("name"):
found = True
break
assert found
blob_client = blob_service_client.get_blob_client(
blob=data_blob, container=container_name
)
blocks_num = len(blob_client.get_block_list()[0])
assert blocks_num > 50
count = 0
for block in blob_client.get_block_list()[0]:
count += 1
if count < blocks_num:
assert block.get("size") == expected_block_size
else:
assert block.get("size") < expected_block_size
azure_query(
node,
f"RESTORE TABLE test_simple_merge_tree AS test_simple_merge_tree_restored_{max_blocks} FROM {backup_destination};",
)
assert azure_query(
node,
f"SELECT * from test_simple_merge_tree_restored_{max_blocks} ORDER BY key",
) == node.query(data_query)

View File

@ -19,4 +19,4 @@
<shard>01</shard>
</macros>
</clickhouse>
</clickhouse>

View File

@ -15,6 +15,6 @@
<shard>01</shard>
</macros>
<default_replica_path>/lol/kek/'/{uuid}</default_replica_path>
<default_replica_path>/clickhouse/'/{database}/{table}/{uuid}</default_replica_path>
</clickhouse>

View File

@ -6,7 +6,7 @@ cluster = ClickHouseCluster(__file__)
ch1 = cluster.add_instance(
"ch1",
main_configs=[
"configs/config.d/clusters_unusual.xml",
"configs/config.d/clusters_zk_path.xml",
"configs/config.d/distributed_ddl.xml",
],
with_zookeeper=True,
@ -63,7 +63,7 @@ def check_tables():
)
.strip()
.startswith(
"ReplicatedReplacingMergeTree(\\'/lol/kek/\\\\\\'/{uuid}\\', \\'{replica}\\', D)"
"ReplicatedReplacingMergeTree(\\'/clickhouse/\\\\\\'/{database}/{table}/{uuid}\\', \\'{replica}\\', D)"
)
)
assert (
@ -73,7 +73,7 @@ def check_tables():
)
.strip()
.startswith(
"ReplicatedVersionedCollapsingMergeTree(\\'/lol/kek/\\\\\\'/{uuid}\\', \\'{replica}\\', Sign, Version)"
"ReplicatedVersionedCollapsingMergeTree(\\'/clickhouse/\\\\\\'/{database}/{table}/{uuid}\\', \\'{replica}\\', Sign, Version)"
)
)

View File

@ -0,0 +1,69 @@
import pytest
from test_modify_engine_on_restart.common import (
get_table_path,
set_convert_flags,
)
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
ch1 = cluster.add_instance(
"ch1",
main_configs=[
"configs/config.d/clusters_zk_path.xml",
"configs/config.d/distributed_ddl.xml",
],
with_zookeeper=True,
macros={"replica": "node1"},
stay_alive=True,
)
database_name = "modify_engine_zk_path"
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def q(node, query):
return node.query(database=database_name, sql=query)
def test_modify_engine_fails_if_zk_path_exists(started_cluster):
ch1.query("CREATE DATABASE " + database_name)
q(
ch1,
"CREATE TABLE already_exists_1 ( A Int64, D Date, S String ) ENGINE MergeTree() PARTITION BY toYYYYMM(D) ORDER BY A;",
)
uuid = q(
ch1,
f"SELECT uuid FROM system.tables WHERE table = 'already_exists_1' and database = '{database_name}'",
).strip("'[]\n")
q(
ch1,
f"CREATE TABLE already_exists_2 ( A Int64, D Date, S String ) ENGINE ReplicatedMergeTree('/clickhouse/\\'/{database_name}/already_exists_1/{uuid}', 'r2') PARTITION BY toYYYYMM(D) ORDER BY A;",
)
set_convert_flags(ch1, database_name, ["already_exists_1"])
table_data_path = get_table_path(ch1, "already_exists_1", database_name)
ch1.stop_clickhouse()
ch1.start_clickhouse(retry_start=False, expected_to_fail=True)
# Check if we can cancel convertation
ch1.exec_in_container(
[
"bash",
"-c",
f"rm {table_data_path}convert_to_replicated",
]
)
ch1.start_clickhouse()

View File

@ -1,4 +0,0 @@
SET send_logs_level = 'fatal';
SET max_block_size = 0;
SELECT number FROM system.numbers; -- { serverError 12 }

View File

@ -132,3 +132,7 @@ SELECT * FROM merge_table ORDER BY id, val;
2 a
2 b
3 c
select sum(number) from numbers(10) settings final=1;
45
select sum(number) from remote('127.0.0.{1,2}', numbers(10)) settings final=1;
90

View File

@ -102,3 +102,6 @@ insert into table_to_merge_c values (3,'c');
-- expected output:
-- 1 c, 2 a, 2 b, 3 c
SELECT * FROM merge_table ORDER BY id, val;
select sum(number) from numbers(10) settings final=1;
select sum(number) from remote('127.0.0.{1,2}', numbers(10)) settings final=1;

View File

@ -103,11 +103,11 @@ SELECT '2^30-1', maxMerge(x) from (select CAST(unhex('ffffff3f') || randomString
SELECT '1M without 0', length(maxMerge(x)) from (select CAST(unhex('00001000') || randomString(0x00100000 - 1) || 'x', 'AggregateFunction(max, String)') as x);
SELECT '1M with 0', length(maxMerge(x)) from (select CAST(unhex('00001000') || randomString(0x00100000 - 1) || '\0', 'AggregateFunction(max, String)') as x);
SELECT 'fuzz1', finalizeAggregation(CAST(unhex('3000000\0303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353600010000000000000000'), 'AggregateFunction(argMax, String, UInt64)')); -- { serverError CORRUPTED_DATA }
SELECT 'fuzz1', finalizeAggregation(CAST(unhex('3000000\0303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353600010000000000000000'), 'AggregateFunction(argMax, String, UInt64)')); -- { serverError INCORRECT_DATA }
SELECT 'fuzz2', finalizeAggregation(CAST(unhex('04000000' || '30313233' || '01' || 'ffffffffffffffff'), 'AggregateFunction(argMax, String, UInt64)')) as x, length(x);
SELECT 'fuzz3', finalizeAggregation(CAST(unhex('04000000' || '30313233' || '00' || 'ffffffffffffffff'), 'AggregateFunction(argMax, String, UInt64)')) as x, length(x); -- { serverError CORRUPTED_DATA }
SELECT 'fuzz4', finalizeAggregation(CAST(unhex('04000000' || '30313233' || '00'), 'AggregateFunction(argMax, String, UInt64)')) as x, length(x); -- { serverError CORRUPTED_DATA }
SELECT 'fuzz5', finalizeAggregation(CAST(unhex('0100000000000000000FFFFFFFF0'), 'AggregateFunction(argMax, UInt64, String)')); -- { serverError CORRUPTED_DATA }
SELECT 'fuzz3', finalizeAggregation(CAST(unhex('04000000' || '30313233' || '00' || 'ffffffffffffffff'), 'AggregateFunction(argMax, String, UInt64)')) as x, length(x); -- { serverError INCORRECT_DATA }
SELECT 'fuzz4', finalizeAggregation(CAST(unhex('04000000' || '30313233' || '00'), 'AggregateFunction(argMax, String, UInt64)')) as x, length(x); -- { serverError INCORRECT_DATA }
SELECT 'fuzz5', finalizeAggregation(CAST(unhex('0100000000000000000FFFFFFFF0'), 'AggregateFunction(argMax, UInt64, String)')); -- { serverError INCORRECT_DATA }
drop table if exists aggr;

View File

@ -1,3 +1,5 @@
DROP TABLE IF EXISTS test;
CREATE TABLE test
(
`id` UInt64,
@ -18,3 +20,16 @@ SELECT name FROM test GROUP BY name SETTINGS force_optimize_projection_name='pro
SELECT name FROM test GROUP BY name SETTINGS force_optimize_projection_name='non_existing_projection'; -- { serverError 117 }
SELECT name FROM test SETTINGS force_optimize_projection_name='projection_name'; -- { serverError 117 }
INSERT INTO test SELECT number, 'test' FROM numbers(1, 100) SETTINGS force_optimize_projection_name='projection_name';
SELECT 1 SETTINGS force_optimize_projection_name='projection_name';
SYSTEM FLUSH LOGS;
SELECT read_rows FROM system.query_log
WHERE current_database = currentDatabase()
AND query LIKE '%SELECT name FROM test%'
AND Settings['force_optimize_projection_name'] = 'projection_name'
AND type = 'ExceptionBeforeStart';
DROP TABLE test;

View File

@ -0,0 +1,7 @@
#!/usr/bin/env bash
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
$CLICKHOUSE_CLIENT -q "SELECT count(*) FROM numbers(10) AS a, numbers(11) AS b, numbers(12) AS c SETTINGS max_block_size = 0" 2>&1 | grep -q "Sanity check: 'max_block_size' cannot be 0. Set to default value" && echo "OK" || echo "FAIL"

View File

@ -7,8 +7,6 @@ export LC_ALL=C # The "total" should be printed without localization
TU_EXCLUDES=(
AggregateFunctionUniq
Aggregator
# FIXME: Exclude for now
FunctionsConversion
)
if find $1 -name '*.o' | xargs wc -c | grep --regexp='\.o$' | sort -rn | awk '{ if ($1 > 50000000) print }' \