Merge branch 'master' into combine-requirements-install-docs

This commit is contained in:
Dan Roscigno 2022-10-05 10:39:22 -04:00 committed by GitHub
commit 563eaf9d0f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 135 additions and 26 deletions

View File

@ -66,38 +66,60 @@ void WriteBufferFromAzureBlobStorage::finalizeImpl()
{
execWithRetry([this](){ next(); }, DEFAULT_RETRY_NUM);
if (tmp_buffer_write_offset > 0)
uploadBlock(tmp_buffer->data(), tmp_buffer_write_offset);
auto block_blob_client = blob_container_client->GetBlockBlobClient(blob_path);
execWithRetry([&](){ block_blob_client.CommitBlockList(block_ids); }, DEFAULT_RETRY_NUM);
LOG_TRACE(log, "Committed {} blocks for blob `{}`", block_ids.size(), blob_path);
}
void WriteBufferFromAzureBlobStorage::uploadBlock(const char * data, size_t size)
{
auto block_blob_client = blob_container_client->GetBlockBlobClient(blob_path);
const std::string & block_id = block_ids.emplace_back(getRandomASCIIString(64));
Azure::Core::IO::MemoryBodyStream memory_stream(reinterpret_cast<const uint8_t *>(data), size);
execWithRetry([&](){ block_blob_client.StageBlock(block_id, memory_stream); }, DEFAULT_RETRY_NUM);
tmp_buffer_write_offset = 0;
LOG_TRACE(log, "Staged block (id: {}) of size {} (blob path: {}).", block_id, size, blob_path);
}
WriteBufferFromAzureBlobStorage::MemoryBufferPtr WriteBufferFromAzureBlobStorage::allocateBuffer() const
{
return std::make_unique<Memory<>>(max_single_part_upload_size);
}
void WriteBufferFromAzureBlobStorage::nextImpl()
{
if (!offset())
size_t size_to_upload = offset();
if (size_to_upload == 0)
return;
char * buffer_begin = working_buffer.begin();
size_t total_size = offset();
if (!tmp_buffer)
tmp_buffer = allocateBuffer();
auto block_blob_client = blob_container_client->GetBlockBlobClient(blob_path);
size_t current_size = 0;
while (current_size < total_size)
size_t uploaded_size = 0;
while (uploaded_size != size_to_upload)
{
size_t part_len = std::min(total_size - current_size, max_single_part_upload_size);
const std::string & block_id = block_ids.emplace_back(getRandomASCIIString(64));
size_t memory_buffer_remaining_size = max_single_part_upload_size - tmp_buffer_write_offset;
if (memory_buffer_remaining_size == 0)
uploadBlock(tmp_buffer->data(), tmp_buffer->size());
Azure::Core::IO::MemoryBodyStream tmp_buffer(reinterpret_cast<uint8_t *>(buffer_begin + current_size), part_len);
execWithRetry([&](){ block_blob_client.StageBlock(block_id, tmp_buffer); }, DEFAULT_RETRY_NUM);
current_size += part_len;
LOG_TRACE(log, "Staged block (id: {}) of size {} (written {}/{}, blob path: {}).", block_id, part_len, current_size, total_size, blob_path);
size_t size = std::min(memory_buffer_remaining_size, size_to_upload - uploaded_size);
memcpy(tmp_buffer->data() + tmp_buffer_write_offset, working_buffer.begin() + uploaded_size, size);
uploaded_size += size;
tmp_buffer_write_offset += size;
}
if (tmp_buffer_write_offset == max_single_part_upload_size)
uploadBlock(tmp_buffer->data(), tmp_buffer->size());
if (write_settings.remote_throttler)
write_settings.remote_throttler->add(total_size);
write_settings.remote_throttler->add(size_to_upload);
}
}

View File

@ -40,6 +40,7 @@ public:
private:
void finalizeImpl() override;
void execWithRetry(std::function<void()> func, size_t num_tries);
void uploadBlock(const char * data, size_t size);
Poco::Logger * log;
@ -49,6 +50,12 @@ private:
AzureClientPtr blob_container_client;
std::vector<std::string> block_ids;
using MemoryBufferPtr = std::unique_ptr<Memory<>>;
MemoryBufferPtr tmp_buffer;
size_t tmp_buffer_write_offset = 0;
MemoryBufferPtr allocateBuffer() const;
};
}

View File

@ -542,7 +542,10 @@ bool MergeTreeIndexConditionBloomFilter::traverseASTEquals(
{
out.function = RPNElement::FUNCTION_HAS;
const DataTypePtr actual_type = BloomFilter::getPrimitiveType(array_type->getNestedType());
Field converted_field = convertFieldToType(value_field, *actual_type, value_type.get());
auto converted_field = convertFieldToType(value_field, *actual_type, value_type.get());
if (converted_field.isNull())
return false;
out.predicate.emplace_back(std::make_pair(position, BloomFilterHash::hashWithField(actual_type.get(), converted_field)));
}
}
@ -565,7 +568,11 @@ bool MergeTreeIndexConditionBloomFilter::traverseASTEquals(
if ((f.isNull() && !is_nullable) || f.isDecimal(f.getType()))
return false;
mutable_column->insert(convertFieldToType(f, *actual_type, value_type.get()));
auto converted = convertFieldToType(f, *actual_type);
if (converted.isNull())
return false;
mutable_column->insert(converted);
}
column = std::move(mutable_column);
@ -583,7 +590,10 @@ bool MergeTreeIndexConditionBloomFilter::traverseASTEquals(
out.function = function_name == "equals" ? RPNElement::FUNCTION_EQUALS : RPNElement::FUNCTION_NOT_EQUALS;
const DataTypePtr actual_type = BloomFilter::getPrimitiveType(index_type);
Field converted_field = convertFieldToType(value_field, *actual_type, value_type.get());
auto converted_field = convertFieldToType(value_field, *actual_type, value_type.get());
if (converted_field.isNull())
return false;
out.predicate.emplace_back(std::make_pair(position, BloomFilterHash::hashWithField(actual_type.get(), converted_field)));
}
@ -611,9 +621,11 @@ bool MergeTreeIndexConditionBloomFilter::traverseASTEquals(
out.function = RPNElement::FUNCTION_HAS;
const DataTypePtr actual_type = BloomFilter::getPrimitiveType(array_type->getNestedType());
Field converted_field = convertFieldToType(value_field, *actual_type, value_type.get());
out.predicate.emplace_back(std::make_pair(position, BloomFilterHash::hashWithField(actual_type.get(), converted_field)));
auto converted_field = convertFieldToType(value_field, *actual_type, value_type.get());
if (converted_field.isNull())
return false;
out.predicate.emplace_back(std::make_pair(position, BloomFilterHash::hashWithField(actual_type.get(), converted_field)));
return true;
}

View File

@ -10,11 +10,11 @@
<!-- default credentials for Azurite storage account -->
<account_name>devstoreaccount1</account_name>
<account_key>Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==</account_key>
<max_single_part_upload_size>33554432</max_single_part_upload_size>
<max_single_part_upload_size>100000</max_single_part_upload_size>
</blob_storage_disk>
<hdd>
<type>local</type>
<path>/</path>
<type>local</type>
<path>/</path>
</hdd>
</disks>
<policies>

View File

@ -6,6 +6,7 @@ import pytest
from helpers.cluster import ClickHouseCluster
from helpers.utility import generate_values, replace_config, SafeThread
from azure.storage.blob import BlobServiceClient
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
@ -571,10 +572,42 @@ def test_restart_during_load(cluster):
def test_big_insert(cluster):
node = cluster.instances[NODE_NAME]
create_table(node, TABLE_NAME)
check_query = "SELECT '2020-01-03', number, toString(number) FROM numbers(1000000)"
azure_query(
node,
f"INSERT INTO {TABLE_NAME} SELECT '2020-01-03', number, toString(number) FROM numbers(1000000)",
f"INSERT INTO {TABLE_NAME} {check_query}",
)
assert azure_query(node, f"SELECT * FROM {TABLE_NAME} ORDER BY id") == node.query(
"SELECT '2020-01-03', number, toString(number) FROM numbers(1000000)"
check_query
)
blob_container_client = cluster.blob_service_client.get_container_client(
CONTAINER_NAME
)
blobs = blob_container_client.list_blobs()
max_single_part_upload_size = 100000
checked = False
for blob in blobs:
blob_client = cluster.blob_service_client.get_blob_client(
CONTAINER_NAME, blob.name
)
committed, uncommited = blob_client.get_block_list()
blocks = committed
last_id = len(blocks)
id = 1
if len(blocks) > 1:
checked = True
for block in blocks:
print(f"blob: {blob.name}, block size: {block.size}")
if id == last_id:
assert max_single_part_upload_size >= block.size
else:
assert max_single_part_upload_size == block.size
id += 1
assert checked

View File

@ -0,0 +1,7 @@
0
0
0
0
0
0
0

View File

@ -0,0 +1,28 @@
{% for type in ["Int8", "Int16", "Int32", "Int64", 'UInt8', 'UInt16', 'UInt32'] -%}
DROP TABLE IF EXISTS bftest__fuzz_21;
CREATE TABLE bftest__fuzz_21
(
`k` Int64,
`x` Array({{ type }}),
INDEX ix1 x TYPE bloom_filter GRANULARITY 3
)
ENGINE = MergeTree
ORDER BY k;
INSERT INTO bftest__fuzz_21 (k, x) SELECT
number,
arrayMap(i -> (rand64() % 565656), range(10))
FROM numbers(1000);
{% if 'UInt' in type -%}
SELECT count() FROM bftest__fuzz_21 WHERE hasAll(x, [42, -42]) SETTINGS use_skip_indexes=1;
SELECT count() FROM bftest__fuzz_21 WHERE hasAll(x, [42, -42]) SETTINGS use_skip_indexes=1, force_data_skipping_indices='ix1'; -- { serverError INDEX_NOT_USED }
{% else -%}
SELECT count() FROM bftest__fuzz_21 WHERE hasAll(x, [42, -42]) SETTINGS use_skip_indexes=1, force_data_skipping_indices='ix1';
{% endif -%}
DROP TABLE IF EXISTS bftest__fuzz_21;
{% endfor -%}