Merge branch 'master' into keeper-some-improvement2

This commit is contained in:
Antonio Andelic 2024-09-07 20:59:04 +02:00
commit 65019c4b9b
5 changed files with 137 additions and 188 deletions

View File

@ -42,21 +42,19 @@ Keep an eye out for upcoming meetups and events around the world. Somewhere else
The following upcoming meetups are featuring creator of ClickHouse & CTO, Alexey Milovidov:
* [ClickHouse Guangzhou User Group Meetup](https://mp.weixin.qq.com/s/GSvo-7xUoVzCsuUvlLTpCw) - August 25
* [San Francisco Meetup (Cloudflare)](https://www.meetup.com/clickhouse-silicon-valley-meetup-group/events/302540575) - September 5
* [Raleigh Meetup (Deutsche Bank)](https://www.meetup.com/triangletechtalks/events/302723486/) - September 9
* [New York Meetup (Rokt)](https://www.meetup.com/clickhouse-new-york-user-group/events/302575342) - September 10
* [Chicago Meetup (Jump Capital)](https://lu.ma/43tvmrfw) - September 12
Other upcoming meetups
* [Seattle Meetup (Statsig)](https://www.meetup.com/clickhouse-seattle-user-group/events/302518075/) - August 27
* [Melbourne Meetup](https://www.meetup.com/clickhouse-australia-user-group/events/302732666/) - August 27
* [Sydney Meetup](https://www.meetup.com/clickhouse-australia-user-group/events/302862966/) - September 5
* [Zurich Meetup](https://www.meetup.com/clickhouse-switzerland-meetup-group/events/302267429/) - September 5
* [Toronto Meetup (Shopify)](https://www.meetup.com/clickhouse-toronto-user-group/events/301490855/) - September 10
* [Austin Meetup](https://www.meetup.com/clickhouse-austin-user-group/events/302558689/) - September 17
* [London Meetup](https://www.meetup.com/clickhouse-london-user-group/events/302977267) - September 17
* [Bangalore Meetup](https://www.meetup.com/clickhouse-bangalore-user-group/events/303208274/) - September 18
* [Tel Aviv Meetup](https://www.meetup.com/clickhouse-meetup-israel/events/303095121) - September 22
* [Jakarta Meetup](https://www.meetup.com/clickhouse-indonesia-user-group/events/303191359/) - October 1
* [Singapore Meetup](https://www.meetup.com/clickhouse-singapore-meetup-group/events/303212064/) - October 3
* [Madrid Meetup](https://www.meetup.com/clickhouse-spain-user-group/events/303096564/) - October 22
* [Barcelona Meetup](https://www.meetup.com/clickhouse-spain-user-group/events/303096876/) - October 29
* [Oslo Meetup](https://www.meetup.com/open-source-real-time-data-warehouse-real-time-analytics/events/302938622) - October 31
@ -64,7 +62,13 @@ Other upcoming meetups
* [Dubai Meetup](https://www.meetup.com/clickhouse-dubai-meetup-group/events/303096989/) - November 21
* [Paris Meetup](https://www.meetup.com/clickhouse-france-user-group/events/303096434) - November 26
Recently completed events
* [ClickHouse Guangzhou User Group Meetup](https://mp.weixin.qq.com/s/GSvo-7xUoVzCsuUvlLTpCw) - August 25
* [Seattle Meetup (Statsig)](https://www.meetup.com/clickhouse-seattle-user-group/events/302518075/) - August 27
* [Melbourne Meetup](https://www.meetup.com/clickhouse-australia-user-group/events/302732666/) - August 27
* [Sydney Meetup](https://www.meetup.com/clickhouse-australia-user-group/events/302862966/) - September 5
* [Zurich Meetup](https://www.meetup.com/clickhouse-switzerland-meetup-group/events/302267429/) - September 5
* [San Francisco Meetup (Cloudflare)](https://www.meetup.com/clickhouse-silicon-valley-meetup-group/events/302540575) - September 5
## Recent Recordings
* **Recent Meetup Videos**: [Meetup Playlist](https://www.youtube.com/playlist?list=PL0Z2YDlm0b3iNDUzpY1S3L_iV4nARda_U) Whenever possible recordings of the ClickHouse Community Meetups are edited and presented as individual talks. Current featuring "Modern SQL in 2023", "Fast, Concurrent, and Consistent Asynchronous INSERTS in ClickHouse", and "Full-Text Indices: Design and Experiments"

View File

@ -6,6 +6,7 @@
# include <Columns/ColumnString.h>
# include <Functions/LowerUpperImpl.h>
# include <base/scope_guard.h>
# include <unicode/ucasemap.h>
# include <unicode/unistr.h>
# include <unicode/urename.h>
@ -49,6 +50,11 @@ struct LowerUpperUTF8Impl
if (U_FAILURE(error_code))
throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "Error calling ucasemap_open: {}", u_errorName(error_code));
SCOPE_EXIT(
{
ucasemap_close(case_map);
});
size_t curr_offset = 0;
for (size_t row_i = 0; row_i < input_rows_count; ++row_i)
{

View File

@ -10,8 +10,6 @@ import string
import ast
import math
from multiprocessing.dummy import Pool
import avro.schema
import avro.io
import avro.datafile
@ -1000,11 +998,7 @@ def test_kafka_formats(kafka_cluster, create_query_generator):
}
topic_postfix = str(hash(create_query_generator))
p = Pool(10)
results = []
def run_for_format(format_name, format_opts):
for format_name, format_opts in list(all_formats.items()):
logging.debug(f"Set up {format_name}")
topic_name = f"format_tests_{format_name}-{topic_postfix}"
data_sample = format_opts["data_sample"]
@ -1040,13 +1034,6 @@ def test_kafka_formats(kafka_cluster, create_query_generator):
),
)
)
for format_name, format_opts in list(all_formats.items()):
results.append(p.apply_async(run_for_format, args=(format_name, format_opts)))
for result in results:
result.get()
raw_expected = """\
0 0 AM 0.5 1 {topic_name} 0 {offset_0}
1 0 AM 0.5 1 {topic_name} 0 {offset_1}
@ -1077,9 +1064,7 @@ def test_kafka_formats(kafka_cluster, create_query_generator):
)
assert result_checker(res)
results = []
def run_for_format2(format_name, format_opts):
for format_name, format_opts in list(all_formats.items()):
logging.debug(("Checking {}".format(format_name)))
topic_name = f"format_tests_{format_name}-{topic_postfix}"
# shift offsets by 1 if format supports empty value
@ -1103,12 +1088,6 @@ def test_kafka_formats(kafka_cluster, create_query_generator):
)
kafka_delete_topic(get_admin_client(kafka_cluster), topic_name)
for format_name, format_opts in list(all_formats.items()):
results.append(p.apply_async(run_for_format2, args=(format_name, format_opts)))
for result in results:
result.get()
# Since everything is async and shaky when receiving messages from Kafka,
# we may want to try and check results multiple times in a loop.
@ -4258,11 +4237,7 @@ def test_kafka_formats_with_broken_message(kafka_cluster, create_query_generator
topic_name_prefix = "format_tests_4_stream_"
topic_name_postfix = get_topic_postfix(create_query_generator)
p = Pool(10)
results = []
def run_for_format(format_name, format_opts):
for format_name, format_opts in list(all_formats.items()):
logging.debug(f"Set up {format_name}")
topic_name = f"{topic_name_prefix}{format_name}{topic_name_postfix}"
data_sample = format_opts["data_sample"]
@ -4303,12 +4278,6 @@ def test_kafka_formats_with_broken_message(kafka_cluster, create_query_generator
"""
)
for format_name, format_opts in list(all_formats.items()):
results.append(p.apply_async(run_for_format, args=(format_name, format_opts)))
for result in results:
result.get()
raw_expected = """\
0 0 AM 0.5 1 {topic_name} 0 {offset_0}
1 0 AM 0.5 1 {topic_name} 0 {offset_1}
@ -4339,9 +4308,7 @@ def test_kafka_formats_with_broken_message(kafka_cluster, create_query_generator
)
assert result_checker(res)
results = []
def run_for_format2(format_name, format_opts):
for format_name, format_opts in list(all_formats.items()):
logging.debug(f"Checking {format_name}")
topic_name = f"{topic_name_prefix}{format_name}{topic_name_postfix}"
# shift offsets by 1 if format supports empty value
@ -4381,12 +4348,6 @@ def test_kafka_formats_with_broken_message(kafka_cluster, create_query_generator
), "Proper error for format: {}".format(format_name)
kafka_delete_topic(admin_client, topic_name)
for format_name, format_opts in list(all_formats.items()):
results.append(p.apply_async(run_for_format2, args=(format_name, format_opts)))
for result in results:
result.get()
@pytest.mark.parametrize(
"create_query_generator",
@ -4870,63 +4831,6 @@ def test_max_rows_per_message(kafka_cluster, create_query_generator):
def test_row_based_formats(kafka_cluster, create_query_generator):
admin_client = get_admin_client(kafka_cluster)
p = Pool(10)
def run_for_format_name(format_name):
logging.debug("Checking {format_name}")
topic_name = format_name + get_topic_postfix(create_query_generator)
view_name = f"kafka_view_{format_name}"
table_name = f"kafka_{format_name}"
with kafka_topic(admin_client, topic_name):
num_rows = 10
max_rows_per_message = 5
message_count = num_rows / max_rows_per_message
create_query = create_query_generator(
table_name,
"key UInt64, value UInt64",
topic_list=topic_name,
consumer_group=topic_name,
format=format_name,
settings={"kafka_max_rows_per_message": max_rows_per_message},
)
instance.query(
f"""
DROP TABLE IF EXISTS test.{view_name};
DROP TABLE IF EXISTS test.{table_name};
{create_query};
CREATE MATERIALIZED VIEW test.{view_name} ENGINE=MergeTree ORDER BY (key, value) AS
SELECT key, value FROM test.{table_name};
INSERT INTO test.{table_name} SELECT number * 10 as key, number * 100 as value FROM numbers({num_rows});
"""
)
messages = kafka_consume_with_retry(
kafka_cluster, topic_name, message_count, need_decode=False
)
assert (
len(messages) == message_count
), f"Invalid message count for {format_name}"
instance.query_with_retry(
f"SELECT count() FROM test.{view_name}",
check_callback=lambda res: int(res) == num_rows,
)
result = instance.query(f"SELECT * FROM test.{view_name}")
expected = ""
for i in range(num_rows):
expected += str(i * 10) + "\t" + str(i * 100) + "\n"
assert result == expected, f"Invalid result for {format_name}"
results = []
for format_name in [
"TSV",
"TSVWithNamesAndTypes",
@ -4945,10 +4849,55 @@ def test_row_based_formats(kafka_cluster, create_query_generator):
"RowBinaryWithNamesAndTypes",
"MsgPack",
]:
results.append(p.apply_async(run_for_format_name, args=(format_name,)))
logging.debug("Checking {format_name}")
for result in results:
result.get()
topic_name = format_name + get_topic_postfix(create_query_generator)
table_name = f"kafka_{format_name}"
with kafka_topic(admin_client, topic_name):
num_rows = 10
max_rows_per_message = 5
message_count = num_rows / max_rows_per_message
create_query = create_query_generator(
table_name,
"key UInt64, value UInt64",
topic_list=topic_name,
consumer_group=topic_name,
format=format_name,
settings={"kafka_max_rows_per_message": max_rows_per_message},
)
instance.query(
f"""
DROP TABLE IF EXISTS test.view;
DROP TABLE IF EXISTS test.{table_name};
{create_query};
CREATE MATERIALIZED VIEW test.view ENGINE=MergeTree ORDER BY (key, value) AS
SELECT key, value FROM test.{table_name};
INSERT INTO test.{table_name} SELECT number * 10 as key, number * 100 as value FROM numbers({num_rows});
"""
)
messages = kafka_consume_with_retry(
kafka_cluster, topic_name, message_count, need_decode=False
)
assert len(messages) == message_count
instance.query_with_retry(
"SELECT count() FROM test.view",
check_callback=lambda res: int(res) == num_rows,
)
result = instance.query("SELECT * FROM test.view")
expected = ""
for i in range(num_rows):
expected += str(i * 10) + "\t" + str(i * 100) + "\n"
assert result == expected
@pytest.mark.parametrize(
@ -5006,12 +4955,16 @@ def test_block_based_formats_2(kafka_cluster, create_query_generator):
num_rows = 100
message_count = 9
p = Pool(10)
def run_for_format_name(format_name):
for format_name in [
"JSONColumns",
"Native",
"Arrow",
"Parquet",
"ORC",
"JSONCompactColumns",
]:
topic_name = format_name + get_topic_postfix(create_query_generator)
table_name = f"kafka_{format_name}"
view_name = f"kafka_view_{format_name}"
logging.debug(f"Checking format {format_name}")
with kafka_topic(admin_client, topic_name):
create_query = create_query_generator(
@ -5024,12 +4977,12 @@ def test_block_based_formats_2(kafka_cluster, create_query_generator):
instance.query(
f"""
DROP TABLE IF EXISTS test.{view_name};
DROP TABLE IF EXISTS test.view;
DROP TABLE IF EXISTS test.{table_name};
{create_query};
CREATE MATERIALIZED VIEW test.{view_name} ENGINE=MergeTree ORDER BY (key, value) AS
CREATE MATERIALIZED VIEW test.view ENGINE=MergeTree ORDER BY (key, value) AS
SELECT key, value FROM test.{table_name};
INSERT INTO test.{table_name} SELECT number * 10 as key, number * 100 as value FROM numbers({num_rows}) settings max_block_size=12, optimize_trivial_insert_select=0;
@ -5038,38 +4991,22 @@ def test_block_based_formats_2(kafka_cluster, create_query_generator):
messages = kafka_consume_with_retry(
kafka_cluster, topic_name, message_count, need_decode=False
)
assert (
len(messages) == message_count
), f"Invalid message count for {format_name}"
assert len(messages) == message_count
rows = int(
instance.query_with_retry(
f"SELECT count() FROM test.{view_name}",
"SELECT count() FROM test.view",
check_callback=lambda res: int(res) == num_rows,
)
)
assert rows == num_rows, f"Invalid row count for {format_name}"
assert rows == num_rows
result = instance.query(f"SELECT * FROM test.{view_name} ORDER by key")
result = instance.query("SELECT * FROM test.view ORDER by key")
expected = ""
for i in range(num_rows):
expected += str(i * 10) + "\t" + str(i * 100) + "\n"
assert result == expected, f"Invalid result for {format_name}"
results = []
for format_name in [
"JSONColumns",
"Native",
"Arrow",
"Parquet",
"ORC",
"JSONCompactColumns",
]:
results.append(p.apply_async(run_for_format_name, args=(format_name,)))
for result in results:
result.get()
assert result == expected
def test_system_kafka_consumers(kafka_cluster):
@ -5363,54 +5300,6 @@ def test_formats_errors(kafka_cluster):
bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port)
)
p = Pool(10)
def run_for_format_name(format_name):
with kafka_topic(admin_client, format_name):
table_name = f"kafka_{format_name}"
view_name = f"kafka_view_{format_name}"
instance.query(
f"""
DROP TABLE IF EXISTS test.{view_name};
DROP TABLE IF EXISTS test.{table_name};
CREATE TABLE test.{table_name} (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = '{format_name}',
kafka_group_name = '{format_name}',
kafka_format = '{format_name}',
kafka_max_rows_per_message = 5,
format_template_row='template_row.format',
format_regexp='id: (.+?)',
input_format_with_names_use_header=0,
format_schema='key_value_message:Message';
CREATE MATERIALIZED VIEW test.{view_name} ENGINE=MergeTree ORDER BY (key, value) AS
SELECT key, value FROM test.{table_name};
"""
)
kafka_produce(
kafka_cluster,
format_name,
["Broken message\nBroken message\nBroken message\n"],
)
num_errors = int(
instance.query_with_retry(
f"SELECT length(exceptions.text) from system.kafka_consumers where database = 'test' and table = '{table_name}'",
check_callback=lambda res: int(res) > 0,
)
)
assert num_errors > 0, f"No errors for {format_name}"
instance.query(f"DROP TABLE test.{table_name}")
instance.query(f"DROP TABLE test.{view_name}")
results = []
for format_name in [
"Template",
"Regexp",
@ -5453,10 +5342,48 @@ def test_formats_errors(kafka_cluster):
"HiveText",
"MySQLDump",
]:
results.append(p.apply_async(run_for_format_name, args=(format_name,)))
with kafka_topic(admin_client, format_name):
table_name = f"kafka_{format_name}"
for result in results:
result.get()
instance.query(
f"""
DROP TABLE IF EXISTS test.view;
DROP TABLE IF EXISTS test.{table_name};
CREATE TABLE test.{table_name} (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = '{format_name}',
kafka_group_name = '{format_name}',
kafka_format = '{format_name}',
kafka_max_rows_per_message = 5,
format_template_row='template_row.format',
format_regexp='id: (.+?)',
input_format_with_names_use_header=0,
format_schema='key_value_message:Message';
CREATE MATERIALIZED VIEW test.view ENGINE=MergeTree ORDER BY (key, value) AS
SELECT key, value FROM test.{table_name};
"""
)
kafka_produce(
kafka_cluster,
format_name,
["Broken message\nBroken message\nBroken message\n"],
)
num_errors = int(
instance.query_with_retry(
f"SELECT length(exceptions.text) from system.kafka_consumers where database = 'test' and table = '{table_name}'",
check_callback=lambda res: int(res) > 0,
)
)
assert num_errors > 0
instance.query(f"DROP TABLE test.{table_name}")
instance.query("DROP TABLE test.view")
@pytest.mark.parametrize(

View File

@ -0,0 +1 @@
españa

View File

@ -0,0 +1,11 @@
#!/usr/bin/env bash
# Tags: no-fasttest
# no-fasttest: upper/lowerUTF8 use ICU
# Test for issue #69336
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
$CLICKHOUSE_LOCAL --query "SELECT lowerUTF8('ESPAÑA')"