Merge branch 'master' into stress-oom

This commit is contained in:
Alexey Milovidov 2021-04-01 01:43:40 +03:00
commit e67d93d165
78 changed files with 1354 additions and 277 deletions

View File

@ -11,11 +11,6 @@ if (NOT MSVC)
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wextra")
endif ()
if (USE_DEBUG_HELPERS)
set (INCLUDE_DEBUG_HELPERS "-I${ClickHouse_SOURCE_DIR}/base -include ${ClickHouse_SOURCE_DIR}/src/Core/iostream_debug_helpers.h")
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${INCLUDE_DEBUG_HELPERS}")
endif ()
# Add some warnings that are not available even with -Wall -Wextra -Wpedantic.
# Intended for exploration of new compiler warnings that may be found useful.
# Applies to clang only

2
contrib/arrow vendored

@ -1 +1 @@
Subproject commit 744bdfe188f018e5e05f5deebd4e9ee0a7706cf4
Subproject commit 616b3dc76a0c8450b4027ded8a78e9619d7c845f

View File

@ -3,7 +3,7 @@
<mysql_port remove="remove"/>
<interserver_http_port remove="remove"/>
<tcp_with_proxy_port remove="remove"/>
<test_keeper_server remove="remove"/>
<keeper_server remove="remove"/>
<listen_host>::</listen_host>
<logger>

View File

@ -13,6 +13,25 @@ dpkg -i package_folder/clickhouse-test_*.deb
function start()
{
if [[ -n "$USE_DATABASE_REPLICATED" ]] && [[ "$USE_DATABASE_REPLICATED" -eq 1 ]]; then
# NOTE We run "clickhouse server" instead of "clickhouse-server"
# to make "pidof clickhouse-server" return single pid of the main instance.
# We wil run main instance using "service clickhouse-server start"
sudo -E -u clickhouse /usr/bin/clickhouse server --config /etc/clickhouse-server1/config.xml --daemon \
-- --path /var/lib/clickhouse1/ --logger.stderr /var/log/clickhouse-server/stderr1.log \
--logger.log /var/log/clickhouse-server/clickhouse-server1.log --logger.errorlog /var/log/clickhouse-server/clickhouse-server1.err.log \
--tcp_port 19000 --tcp_port_secure 19440 --http_port 18123 --https_port 18443 --interserver_http_port 19009 --tcp_with_proxy_port 19010 \
--mysql_port 19004 \
--keeper_server.tcp_port 19181 --keeper_server.server_id 2
sudo -E -u clickhouse /usr/bin/clickhouse server --config /etc/clickhouse-server2/config.xml --daemon \
-- --path /var/lib/clickhouse2/ --logger.stderr /var/log/clickhouse-server/stderr2.log \
--logger.log /var/log/clickhouse-server/clickhouse-server2.log --logger.errorlog /var/log/clickhouse-server/clickhouse-server2.err.log \
--tcp_port 29000 --tcp_port_secure 29440 --http_port 28123 --https_port 28443 --interserver_http_port 29009 --tcp_with_proxy_port 29010 \
--mysql_port 29004 \
--keeper_server.tcp_port 29181 --keeper_server.server_id 3
fi
counter=0
until clickhouse-client --query "SELECT 1"
do
@ -35,9 +54,8 @@ start
/s3downloader --dataset-names $DATASETS
chmod 777 -R /var/lib/clickhouse
clickhouse-client --query "SHOW DATABASES"
clickhouse-client --query "ATTACH DATABASE datasets ENGINE = Ordinary"
clickhouse-client --query "CREATE DATABASE test"
clickhouse-client --query "ATTACH DATABASE datasets ENGINE = Ordinary"
service clickhouse-server restart
# Wait for server to start accepting connections
@ -47,24 +65,50 @@ for _ in {1..120}; do
done
clickhouse-client --query "SHOW TABLES FROM datasets"
clickhouse-client --query "SHOW TABLES FROM test"
clickhouse-client --query "RENAME TABLE datasets.hits_v1 TO test.hits"
clickhouse-client --query "RENAME TABLE datasets.visits_v1 TO test.visits"
clickhouse-client --query "SHOW TABLES FROM test"
if grep -q -- "--use-skip-list" /usr/bin/clickhouse-test ; then
SKIP_LIST_OPT="--use-skip-list"
fi
# We can have several additional options so we path them as array because it's
# more idiologically correct.
read -ra ADDITIONAL_OPTIONS <<< "${ADDITIONAL_OPTIONS:-}"
if [[ -n "$USE_DATABASE_REPLICATED" ]] && [[ "$USE_DATABASE_REPLICATED" -eq 1 ]]; then
ADDITIONAL_OPTIONS+=('--replicated-database')
clickhouse-client --query "CREATE DATABASE test ON CLUSTER 'test_cluster_database_replicated'
ENGINE=Replicated('/test/clickhouse/db/test', '{shard}', '{replica}')"
clickhouse-client --query "CREATE TABLE test.hits AS datasets.hits_v1"
clickhouse-client --query "CREATE TABLE test.visits AS datasets.visits_v1"
clickhouse-client --query "INSERT INTO test.hits SELECT * FROM datasets.hits_v1"
clickhouse-client --query "INSERT INTO test.visits SELECT * FROM datasets.visits_v1"
clickhouse-client --query "DROP TABLE datasets.hits_v1"
clickhouse-client --query "DROP TABLE datasets.visits_v1"
MAX_RUN_TIME=$((MAX_RUN_TIME < 9000 ? MAX_RUN_TIME : 9000)) # min(MAX_RUN_TIME, 2.5 hours)
MAX_RUN_TIME=$((MAX_RUN_TIME != 0 ? MAX_RUN_TIME : 9000)) # set to 2.5 hours if 0 (unlimited)
else
clickhouse-client --query "CREATE DATABASE test"
clickhouse-client --query "SHOW TABLES FROM test"
clickhouse-client --query "RENAME TABLE datasets.hits_v1 TO test.hits"
clickhouse-client --query "RENAME TABLE datasets.visits_v1 TO test.visits"
fi
clickhouse-test --testname --shard --zookeeper --no-stateless --hung-check --print-time "$SKIP_LIST_OPT" "${ADDITIONAL_OPTIONS[@]}" "$SKIP_TESTS_OPTION" 2>&1 | ts '%Y-%m-%d %H:%M:%S' | tee test_output/test_result.txt
clickhouse-client --query "SHOW TABLES FROM test"
clickhouse-client --query "SELECT count() FROM test.hits"
clickhouse-client --query "SELECT count() FROM test.visits"
function run_tests()
{
set -x
# We can have several additional options so we path them as array because it's
# more idiologically correct.
read -ra ADDITIONAL_OPTIONS <<< "${ADDITIONAL_OPTIONS:-}"
if [[ -n "$USE_DATABASE_REPLICATED" ]] && [[ "$USE_DATABASE_REPLICATED" -eq 1 ]]; then
ADDITIONAL_OPTIONS+=('--replicated-database')
fi
clickhouse-test --testname --shard --zookeeper --no-stateless --hung-check --use-skip-list --print-time "${ADDITIONAL_OPTIONS[@]}" \
"$SKIP_TESTS_OPTION" 2>&1 | ts '%Y-%m-%d %H:%M:%S' | tee test_output/test_result.txt
}
export -f run_tests
timeout "$MAX_RUN_TIME" bash -c run_tests ||:
./process_functional_tests_result.py || echo -e "failure\tCannot parse results" > /test_output/check_status.tsv
@ -73,3 +117,9 @@ mv /var/log/clickhouse-server/stderr.log /test_output/ ||:
if [[ -n "$WITH_COVERAGE" ]] && [[ "$WITH_COVERAGE" -eq 1 ]]; then
tar -chf /test_output/clickhouse_coverage.tar.gz /profraw ||:
fi
if [[ -n "$USE_DATABASE_REPLICATED" ]] && [[ "$USE_DATABASE_REPLICATED" -eq 1 ]]; then
pigz < /var/log/clickhouse-server/clickhouse-server1.log > /test_output/clickhouse-server1.log.gz ||:
pigz < /var/log/clickhouse-server/clickhouse-server2.log > /test_output/clickhouse-server2.log.gz ||:
mv /var/log/clickhouse-server/stderr1.log /test_output/ ||:
mv /var/log/clickhouse-server/stderr2.log /test_output/ ||:
fi

View File

@ -12,6 +12,8 @@ UNKNOWN_SIGN = "[ UNKNOWN "
SKIPPED_SIGN = "[ SKIPPED "
HUNG_SIGN = "Found hung queries in processlist"
NO_TASK_TIMEOUT_SIGN = "All tests have finished"
def process_test_log(log_path):
total = 0
skipped = 0
@ -19,10 +21,13 @@ def process_test_log(log_path):
failed = 0
success = 0
hung = False
task_timeout = True
test_results = []
with open(log_path, 'r') as test_file:
for line in test_file:
line = line.strip()
if NO_TASK_TIMEOUT_SIGN in line:
task_timeout = False
if HUNG_SIGN in line:
hung = True
if any(sign in line for sign in (OK_SIGN, FAIL_SING, UNKNOWN_SIGN, SKIPPED_SIGN)):
@ -52,7 +57,7 @@ def process_test_log(log_path):
else:
success += int(OK_SIGN in line)
test_results.append((test_name, "OK", test_time))
return total, skipped, unknown, failed, success, hung, test_results
return total, skipped, unknown, failed, success, hung, task_timeout, test_results
def process_result(result_path):
test_results = []
@ -68,7 +73,7 @@ def process_result(result_path):
state = "error"
if result_path and os.path.exists(result_path):
total, skipped, unknown, failed, success, hung, test_results = process_test_log(result_path)
total, skipped, unknown, failed, success, hung, task_timeout, test_results = process_test_log(result_path)
is_flacky_check = 1 < int(os.environ.get('NUM_TRIES', 1))
# If no tests were run (success == 0) it indicates an error (e.g. server did not start or crashed immediately)
# But it's Ok for "flaky checks" - they can contain just one test for check which is marked as skipped.
@ -78,6 +83,9 @@ def process_result(result_path):
if hung:
description = "Some queries hung, "
state = "failure"
elif task_timeout:
description = "Timeout, "
state = "failure"
else:
description = ""

View File

@ -34,17 +34,37 @@ if [ "$NUM_TRIES" -gt "1" ]; then
# simpliest way to forward env variables to server
sudo -E -u clickhouse /usr/bin/clickhouse-server --config /etc/clickhouse-server/config.xml --daemon
sleep 5
else
service clickhouse-server start && sleep 5
service clickhouse-server start
fi
if grep -q -- "--use-skip-list" /usr/bin/clickhouse-test; then
SKIP_LIST_OPT="--use-skip-list"
if [[ -n "$USE_DATABASE_REPLICATED" ]] && [[ "$USE_DATABASE_REPLICATED" -eq 1 ]]; then
sudo -E -u clickhouse /usr/bin/clickhouse server --config /etc/clickhouse-server1/config.xml --daemon \
-- --path /var/lib/clickhouse1/ --logger.stderr /var/log/clickhouse-server/stderr1.log \
--logger.log /var/log/clickhouse-server/clickhouse-server1.log --logger.errorlog /var/log/clickhouse-server/clickhouse-server1.err.log \
--tcp_port 19000 --tcp_port_secure 19440 --http_port 18123 --https_port 18443 --interserver_http_port 19009 --tcp_with_proxy_port 19010 \
--mysql_port 19004 \
--keeper_server.tcp_port 19181 --keeper_server.server_id 2 \
--macros.replica r2 # It doesn't work :(
sudo -E -u clickhouse /usr/bin/clickhouse server --config /etc/clickhouse-server2/config.xml --daemon \
-- --path /var/lib/clickhouse2/ --logger.stderr /var/log/clickhouse-server/stderr2.log \
--logger.log /var/log/clickhouse-server/clickhouse-server2.log --logger.errorlog /var/log/clickhouse-server/clickhouse-server2.err.log \
--tcp_port 29000 --tcp_port_secure 29440 --http_port 28123 --https_port 28443 --interserver_http_port 29009 --tcp_with_proxy_port 29010 \
--mysql_port 29004 \
--keeper_server.tcp_port 29181 --keeper_server.server_id 3 \
--macros.shard s2 # It doesn't work :(
MAX_RUN_TIME=$((MAX_RUN_TIME < 9000 ? MAX_RUN_TIME : 9000)) # min(MAX_RUN_TIME, 2.5 hours)
MAX_RUN_TIME=$((MAX_RUN_TIME != 0 ? MAX_RUN_TIME : 9000)) # set to 2.5 hours if 0 (unlimited)
fi
sleep 5
function run_tests()
{
set -x
# We can have several additional options so we path them as array because it's
# more idiologically correct.
read -ra ADDITIONAL_OPTIONS <<< "${ADDITIONAL_OPTIONS:-}"
@ -63,8 +83,7 @@ function run_tests()
fi
clickhouse-test --testname --shard --zookeeper --hung-check --print-time \
--test-runs "$NUM_TRIES" \
"$SKIP_LIST_OPT" "${ADDITIONAL_OPTIONS[@]}" 2>&1 \
--use-skip-list --test-runs "$NUM_TRIES" "${ADDITIONAL_OPTIONS[@]}" 2>&1 \
| ts '%Y-%m-%d %H:%M:%S' \
| tee -a test_output/test_result.txt
}
@ -88,3 +107,10 @@ if [[ -n "$WITH_COVERAGE" ]] && [[ "$WITH_COVERAGE" -eq 1 ]]; then
fi
tar -chf /test_output/text_log_dump.tar /var/lib/clickhouse/data/system/text_log ||:
tar -chf /test_output/query_log_dump.tar /var/lib/clickhouse/data/system/query_log ||:
if [[ -n "$USE_DATABASE_REPLICATED" ]] && [[ "$USE_DATABASE_REPLICATED" -eq 1 ]]; then
pigz < /var/log/clickhouse-server/clickhouse-server1.log > /test_output/clickhouse-server1.log.gz ||:
pigz < /var/log/clickhouse-server/clickhouse-server2.log > /test_output/clickhouse-server2.log.gz ||:
mv /var/log/clickhouse-server/stderr1.log /test_output/ ||:
mv /var/log/clickhouse-server/stderr2.log /test_output/ ||:
fi

View File

@ -12,9 +12,12 @@ toc_title: Adopters
|---------|----------|---------|--------------|------------------------------------------------------------------------------|-----------|
| <a href="https://2gis.ru" class="favicon">2gis</a> | Maps | Monitoring | — | — | [Talk in Russian, July 2019](https://youtu.be/58sPkXfq6nw) |
| <a href="https://getadmiral.com/" class="favicon">Admiral</a> | Martech | Engagement Management | — | — | [Webinar Slides, June 2020](https://altinity.com/presentations/2020/06/16/big-data-in-real-time-how-clickhouse-powers-admirals-visitor-relationships-for-publishers) |
| <a href="http://www.adscribe.tv/" class="favicon">AdScribe</a> | Ads | TV Analytics | — | — | [A quote from CTO](https://altinity.com/24x7-support/) |
| <a href="https://cn.aliyun.com/" class="favicon">Alibaba Cloud</a> | Cloud | Managed Service | — | — | [Official Website](https://help.aliyun.com/product/144466.html) |
| <a href="https://alohabrowser.com/" class="favicon">Aloha Browser</a> | Mobile App | Browser backend | — | — | [Slides in Russian, May 2019](https://presentations.clickhouse.tech/meetup22/aloha.pdf) |
| <a href="https://altinity.com/" class="favicon">Altinity</a> | Cloud, SaaS | Main product | — | — | [Official Website](https://altinity.com/) |
| <a href="https://amadeus.com/" class="favicon">Amadeus</a> | Travel | Analytics | — | — | [Press Release, April 2018](https://www.altinity.com/blog/2018/4/5/amadeus-technologies-launches-investment-and-insights-tool-based-on-machine-learning-and-strategy-algorithms) |
| <a href="https://apiroad.net/" class="favicon">ApiRoad</a> | API marketplace | Analytics | — | — | [Blog post, Nov 2018, Mar 2020](https://pixeljets.com/blog/clickhouse-vs-elasticsearch/) |
| <a href="https://www.appsflyer.com" class="favicon">Appsflyer</a> | Mobile analytics | Main product | — | — | [Talk in Russian, July 2019](https://www.youtube.com/watch?v=M3wbRlcpBbY) |
| <a href="https://arenadata.tech/" class="favicon">ArenaData</a> | Data Platform | Main product | — | — | [Slides in Russian, December 2019](https://github.com/ClickHouse/clickhouse-presentations/blob/master/meetup38/indexes.pdf) |
| <a href="https://avito.ru/" class="favicon">Avito</a> | Classifieds | Monitoring | — | — | [Meetup, April 2020](https://www.youtube.com/watch?v=n1tm4j4W8ZQ) |
@ -37,6 +40,7 @@ toc_title: Adopters
| <a href="https://www.creditx.com" class="favicon">CraiditX 氪信</a> | Finance AI | Analysis | — | — | [Slides in English, November 2019](https://github.com/ClickHouse/clickhouse-presentations/blob/master/meetup33/udf.pptx) |
| <a href="https://crazypanda.ru/en/" class="favicon">Crazypanda</a> | Games | | — | — | Live session on ClickHouse meetup |
| <a href="https://www.criteo.com/" class="favicon">Criteo</a> | Retail | Main product | — | — | [Slides in English, October 2018](https://github.com/ClickHouse/clickhouse-presentations/blob/master/meetup18/3_storetail.pptx) |
| <a href="https://cryptology.com/" class="favicon">Cryptology</a> | Digital Assets Trading Platform | — | — | — | [Job advertisement, March 2021](https://career.habr.com/companies/cryptology/vacancies) |
| <a href="https://www.chinatelecomglobal.com/" class="favicon">Dataliance for China Telecom</a> | Telecom | Analytics | — | — | [Slides in Chinese, January 2018](https://github.com/ClickHouse/clickhouse-presentations/blob/master/meetup12/telecom.pdf) |
| <a href="https://db.com" class="favicon">Deutsche Bank</a> | Finance | BI Analytics | — | — | [Slides in English, October 2019](https://bigdatadays.ru/wp-content/uploads/2019/10/D2-H3-3_Yakunin-Goihburg.pdf) |
| <a href="https://deeplay.io/eng/" class="favicon">Deeplay</a> | Gaming Analytics | — | — | — | [Job advertisement, 2020](https://career.habr.com/vacancies/1000062568) |
@ -49,6 +53,7 @@ toc_title: Adopters
| <a href="https://fun.co/rp" class="favicon">FunCorp</a> | Games | | — | 14 bn records/day as of Jan 2021 | [Article](https://www.altinity.com/blog/migrating-from-redshift-to-clickhouse) |
| <a href="https://geniee.co.jp" class="favicon">Geniee</a> | Ad network | Main product | — | — | [Blog post in Japanese, July 2017](https://tech.geniee.co.jp/entry/2017/07/20/160100) |
| <a href="https://www.genotek.ru/" class="favicon">Genotek</a> | Bioinformatics | Main product | — | — | [Video, August 2020](https://youtu.be/v3KyZbz9lEE) |
| <a href="https://glaber.io/" class="favicon">Glaber</a> | Monitoring | Main product | — | — | [Website](https://glaber.io/) |
| <a href="https://www.huya.com/" class="favicon">HUYA</a> | Video Streaming | Analytics | — | — | [Slides in Chinese, October 2018](https://github.com/ClickHouse/clickhouse-presentations/blob/master/meetup19/7.%20ClickHouse万亿数据分析实践%20李本旺(sundy-li)%20虎牙.pdf) |
| <a href="https://www.the-ica.com/" class="favicon">ICA</a> | FinTech | Risk Management | — | — | [Blog Post in English, Sep 2020](https://altinity.com/blog/clickhouse-vs-redshift-performance-for-fintech-risk-management?utm_campaign=ClickHouse%20vs%20RedShift&utm_content=143520807&utm_medium=social&utm_source=twitter&hss_channel=tw-3894792263) |
| <a href="https://www.idealista.com" class="favicon">Idealista</a> | Real Estate | Analytics | — | — | [Blog Post in English, April 2019](https://clickhouse.tech/blog/en/clickhouse-meetup-in-madrid-on-april-2-2019) |
@ -65,15 +70,18 @@ toc_title: Adopters
| <a href="https://www.lbl.gov" class="favicon">Lawrence Berkeley National Laboratory</a> | Research | Traffic analysis | 1 server | 11.8 TiB | [Slides in English, April 2019](https://www.smitasin.com/presentations/2019-04-17_DOE-NSM.pdf) |
| <a href="https://lifestreet.com/" class="favicon">LifeStreet</a> | Ad network | Main product | 75 servers (3 replicas) | 5.27 PiB | [Blog post in Russian, February 2017](https://habr.com/en/post/322620/) |
| <a href="https://mcs.mail.ru/" class="favicon">Mail.ru Cloud Solutions</a> | Cloud services | Main product | — | — | [Article in Russian](https://mcs.mail.ru/help/db-create/clickhouse#) |
| <a href="https://maxilect.com/" class="favicon">MAXILECT</a> | Ad Tech, Blockchain, ML, AI | — | — | — | [Job advertisement, 2021](https://www.linkedin.com/feed/update/urn:li:activity:6780842017229430784/) |
| <a href="https://tech.mymarilyn.ru" class="favicon">Marilyn</a> | Advertising | Statistics | — | — | [Talk in Russian, June 2017](https://www.youtube.com/watch?v=iXlIgx2khwc) |
| <a href="https://mellodesign.ru/" class="favicon">Mello</a> | Marketing | Analytics | 1 server | — | [Article, Oct 2020](https://vc.ru/marketing/166180-razrabotka-tipovogo-otcheta-skvoznoy-analitiki) |
| <a href="https://www.messagebird.com" class="favicon">MessageBird</a> | Telecommunications | Statistics | — | — | [Slides in English, November 2018](https://github.com/ClickHouse/clickhouse-presentations/blob/master/meetup20/messagebird.pdf) |
| <a href="https://www.mindsdb.com/" class="favicon">MindsDB</a> | Machine Learning | Main Product | — | — | [Official Website](https://www.mindsdb.com/blog/machine-learning-models-as-tables-in-ch) |x
| <a href="https://mux.com/" class="favicon">MUX</a> | Online Video | Video Analytics | — | — | [Talk in English, August 2019](https://altinity.com/presentations/2019/8/13/how-clickhouse-became-the-default-analytics-database-for-mux/) |
| <a href="https://www.mgid.com/" class="favicon">MGID</a> | Ad network | Web-analytics | — | — | [Blog post in Russian, April 2020](http://gs-studio.com/news-about-it/32777----clickhouse---c) |
| <a href="https://www.netskope.com/" class="favicon">Netskope</a> | Network Security | — | — | — | [Job advertisement, March 2021](https://www.mendeley.com/careers/job/senior-software-developer-backend-developer-1346348) |
| <a href="https://getnoc.com/" class="favicon">NOC Project</a> | Network Monitoring | Analytics | Main Product | — | [Official Website](https://getnoc.com/features/big-data/) |
| <a href="https://www.nuna.com/" class="favicon">Nuna Inc.</a> | Health Data Analytics | — | — | — | [Talk in English, July 2020](https://youtu.be/GMiXCMFDMow?t=170) |
| <a href="https://www.oneapm.com/" class="favicon">OneAPM</a> | Monitorings and Data Analysis | Main product | — | — | [Slides in Chinese, October 2018](https://github.com/ClickHouse/clickhouse-presentations/blob/master/meetup19/8.%20clickhouse在OneAPM的应用%20杜龙.pdf) |
| <a href="https://corp.ozon.com/" class="favicon">OZON</a> | E-commerce | — | — | — | [Official website](https://job.ozon.ru/vacancy/razrabotchik-clickhouse-ekspluatatsiya-40991870/) |
| <a href="https://panelbear.com/" class="favicon">Panelbear | Analytics | Monitoring and Analytics | — | — | [Tech Stack, November 2020](https://panelbear.com/blog/tech-stack/) |
| <a href="https://www.percent.cn/" class="favicon">Percent 百分点</a> | Analytics | Main Product | — | — | [Slides in Chinese, June 2019](https://github.com/ClickHouse/clickhouse-presentations/blob/master/meetup24/4.%20ClickHouse万亿数据双中心的设计与实践%20.pdf) |
| <a href="https://www.percona.com/" class="favicon">Percona</a> | Performance analysis | Percona Monitoring and Management | — | — | [Official website, Mar 2020](https://www.percona.com/blog/2020/03/30/advanced-query-analysis-in-percona-monitoring-and-management-with-direct-clickhouse-access/) |
@ -92,12 +100,14 @@ toc_title: Adopters
| <a href="https://www.s7.ru" class="favicon">S7 Airlines</a> | Airlines | Metrics, Logging | — | — | [Talk in Russian, March 2019](https://www.youtube.com/watch?v=nwG68klRpPg&t=15s) |
| <a href="https://www.scireum.de/" class="favicon">scireum GmbH</a> | e-Commerce | Main product | — | — | [Talk in German, February 2020](https://www.youtube.com/watch?v=7QWAn5RbyR4) |
| <a href="https://segment.com/" class="favicon">Segment</a> | Data processing | Main product | 9 * i3en.3xlarge nodes 7.5TB NVME SSDs, 96GB Memory, 12 vCPUs | — | [Slides, 2019](https://slides.com/abraithwaite/segment-clickhouse) |
| <a href="https://sembot.io/" class="favicon">sembot.io</a> | Shopping Ads | — | — | — | A comment on LinkedIn, 2020 |
| <a href="https://www.semrush.com/" class="favicon">SEMrush</a> | Marketing | Main product | — | — | [Slides in Russian, August 2018](https://github.com/ClickHouse/clickhouse-presentations/blob/master/meetup17/5_semrush.pdf) |
| <a href="https://sentry.io/" class="favicon">Sentry</a> | Software Development | Main product | — | — | [Blog Post in English, May 2019](https://blog.sentry.io/2019/05/16/introducing-snuba-sentrys-new-search-infrastructure) |
| <a href="https://seo.do/" class="favicon">seo.do</a> | Analytics | Main product | — | — | [Slides in English, November 2019](https://github.com/ClickHouse/clickhouse-presentations/blob/master/meetup35/CH%20Presentation-%20Metehan%20Çetinkaya.pdf) |
| <a href="http://www.sgk.gov.tr/wps/portal/sgk/tr" class="favicon">SGK</a> | Goverment Social Security | Analytics | — | — | [Slides in English, November 2019](https://github.com/ClickHouse/clickhouse-presentations/blob/master/meetup35/ClickHouse%20Meetup-Ramazan%20POLAT.pdf) |
| <a href="http://english.sina.com/index.html" class="favicon">Sina</a> | News | — | — | — | [Slides in Chinese, October 2018](https://github.com/ClickHouse/clickhouse-presentations/blob/master/meetup19/6.%20ClickHouse最佳实践%20高鹏_新浪.pdf) |
| <a href="https://smi2.ru/" class="favicon">SMI2</a> | News | Analytics | — | — | [Blog Post in Russian, November 2017](https://habr.com/ru/company/smi2/blog/314558/) |
| <a href="https://www.spark.co.nz/" class="favicon">Spark New Zealand</a> | Telecommunications | Security Operations | — | — | [Blog Post, Feb 2020](https://blog.n0p.me/2020/02/2020-02-05-dnsmonster/) |
| <a href="https://www.splunk.com/" class="favicon">Splunk</a> | Business Analytics | Main product | — | — | [Slides in English, January 2018](https://github.com/ClickHouse/clickhouse-presentations/blob/master/meetup12/splunk.pdf) |
| <a href="https://www.spotify.com" class="favicon">Spotify</a> | Music | Experimentation | — | — | [Slides, July 2018](https://www.slideshare.net/glebus/using-clickhouse-for-experimentation-104247173) |
| <a href="https://www.staffcop.ru/" class="favicon">Staffcop</a> | Information Security | Main Product | — | — | [Official website, Documentation](https://www.staffcop.ru/sce43) |
@ -106,13 +116,17 @@ toc_title: Adopters
| <a href="https://www.tencent.com" class="favicon">Tencent</a> | Big Data | Data processing | — | — | [Slides in Chinese, October 2018](https://github.com/ClickHouse/clickhouse-presentations/blob/master/meetup19/5.%20ClickHouse大数据集群应用_李俊飞腾讯网媒事业部.pdf) |
| <a href="https://www.tencent.com" class="favicon">Tencent</a> | Messaging | Logging | — | — | [Talk in Chinese, November 2019](https://youtu.be/T-iVQRuw-QY?t=5050) |
| <a href="https://www.tencentmusic.com/" class="favicon">Tencent Music Entertainment (TME)</a> | BigData | Data processing | — | — | [Blog in Chinese, June 2020](https://cloud.tencent.com/developer/article/1637840) |
| <a href="https://www.tinybird.co/" class="favicon">Tinybird</a> | Real-time Data Products | Data processing | — | — | [Official website](https://www.tinybird.co/) |
| <a href="https://trafficstars.com/" class="favicon">Traffic Stars</a> | AD network | — | — | — | [Slides in Russian, May 2018](https://github.com/ClickHouse/clickhouse-presentations/blob/master/meetup15/lightning/ninja.pdf) |
| <a href="https://www.uber.com" class="favicon">Uber</a> | Taxi | Logging | — | — | [Slides, February 2020](https://presentations.clickhouse.tech/meetup40/uber.pdf) |
| <a href="https://vk.com" class="favicon">VKontakte</a> | Social Network | Statistics, Logging | — | — | [Slides in Russian, August 2018](https://github.com/ClickHouse/clickhouse-presentations/blob/master/meetup17/3_vk.pdf) |
| <a href="https://www.vmware.com/" class="favicon">VMWare</a> | Cloud | VeloCloud, SDN | — | — | [Product documentation](https://docs.vmware.com/en/vRealize-Operations-Manager/8.3/com.vmware.vcom.metrics.doc/GUID-A9AD72E1-C948-4CA2-971B-919385AB3CA8.html) |
| <a href="https://www.walmartlabs.com/" class="favicon">Walmart Labs</a> | Internet, Retail | — | — | — | [Talk in English, July 2020](https://youtu.be/GMiXCMFDMow?t=144) |
| <a href="https://wargaming.com/en/" class="favicon">Wargaming</a> | Games | | — | — | [Interview](https://habr.com/en/post/496954/) |
| <a href="https://www.wildberries.ru/" class="favicon">Wildberries</a> | E-commerce | | — | — | [Official website](https://it.wildberries.ru/) |
| <a href="https://wisebits.com/" class="favicon">Wisebits</a> | IT Solutions | Analytics | — | — | [Slides in Russian, May 2019](https://github.com/ClickHouse/clickhouse-presentations/blob/master/meetup22/strategies.pdf) |
| <a href="https://www.workato.com/" class="favicon">Workato</a> | Automation Software | — | — | — | [Talk in English, July 2020](https://youtu.be/GMiXCMFDMow?t=334) |
| <a href="https://xenoss.io/" class="favicon">Xenoss</a> | Marketing, Advertising | — | — | — | [Instagram, March 2021](https://www.instagram.com/p/CNATV7qBgB1/) |
| <a href="http://www.xiaoxintech.cn/" class="favicon">Xiaoxin Tech</a> | Education | Common purpose | — | — | [Slides in English, November 2019](https://github.com/ClickHouse/clickhouse-presentations/blob/master/meetup33/sync-clickhouse-with-mysql-mongodb.pptx) |
| <a href="https://www.ximalaya.com/" class="favicon">Ximalaya</a> | Audio sharing | OLAP | — | — | [Slides in English, November 2019](https://github.com/ClickHouse/clickhouse-presentations/blob/master/meetup33/ximalaya.pdf) |
| <a href="https://cloud.yandex.ru/services/managed-clickhouse" class="favicon">Yandex Cloud</a> | Public Cloud | Main product | — | — | [Talk in Russian, December 2019](https://www.youtube.com/watch?v=pgnak9e_E0o) |
@ -122,7 +136,9 @@ toc_title: Adopters
| <a href="https://htc-cs.ru/" class="favicon">ЦВТ</a> | Software Development | Metrics, Logging | — | — | [Blog Post, March 2019, in Russian](https://vc.ru/dev/62715-kak-my-stroili-monitoring-na-prometheus-clickhouse-i-elk) |
| <a href="https://mkb.ru/" class="favicon">МКБ</a> | Bank | Web-system monitoring | — | — | [Slides in Russian, September 2019](https://github.com/ClickHouse/clickhouse-presentations/blob/master/meetup28/mkb.pdf) |
| <a href="https://cft.ru/" class="favicon">ЦФТ</a> | Banking, Financial products, Payments | — | — | — | [Meetup in Russian, April 2020](https://team.cft.ru/events/162) |
| <a href="https://promo.croc.ru/digitalworker" class="favicon">Цифровой Рабочий</a> | Industrial IoT, Analytics | — | — | — | [Blog post in Russian, March 2021](https://habr.com/en/company/croc/blog/548018/) |
| <a href="https://www.kakaocorp.com/" class="favicon">kakaocorp</a> | Internet company | — | — | — | [if(kakao)2020 conference](https://if.kakao.com/session/117) |
| <a href="https://shop.okraina.ru/" class="favicon">ООО «МПЗ Богородский»</a> | Agriculture | — | — | — | [Article in Russian, November 2020](https://cloud.yandex.ru/cases/okraina) |
| <a href="https://www.tesla.com/" class="favicon">Tesla</a> | Electric vehicle and clean energy company | — | — | — | [Vacancy description, March 2021](https://news.ycombinator.com/item?id=26306170) |
[Original article](https://clickhouse.tech/docs/en/introduction/adopters/) <!--hide-->

View File

@ -0,0 +1,35 @@
---
toc_priority: 43
toc_title: Files
---
# Functions for Working with Files {#functions-for-working-with-files}
## file {#file}
Reads file as a String. The file content is not parsed, so any information is read as one string and placed into the specified column.
**Syntax**
``` sql
file(path)
```
**Arguments**
- `path` — The relative path to the file from [user_files_path](../../operations/server-configuration-parameters/settings.md#server_configuration_parameters-user_files_path). Path to file support following wildcards: `*`, `?`, `{abc,def}` and `{N..M}` where `N`, `M` — numbers, `'abc', 'def'` — strings.
**Example**
Inserting data from files a.txt and b.txt into a table as strings:
Query:
``` sql
INSERT INTO table SELECT file('a.txt'), file('b.txt');
```
**See Also**
- [user_files_path](../../operations/server-configuration-parameters/settings.md#server_configuration_parameters-user_files_path)
- [file](../table-functions/file.md)

View File

@ -0,0 +1,33 @@
---
toc_priority: 43
toc_title: "Функции для работы с файлами"
---
# Функции для работы с файлами {#funktsii-dlia-raboty-s-failami}
## file {#file}
Читает файл как строку. Содержимое файла не разбирается (не парсится) и записывается в указанную колонку в виде единой строки.
**Синтаксис**
``` sql
file(path)
```
**Аргументы**
- `path` — относительный путь до файла от [user_files_path](../../operations/server-configuration-parameters/settings.md#server_configuration_parameters-user_files_path). Путь к файлу может включать следующие символы подстановки и шаблоны: `*`, `?`, `{abc,def}` и `{N..M}`, где `N`, `M` — числа, `'abc', 'def'` — строки.
**Примеры**
Вставка данных из файлов a.txt и b.txt в таблицу в виде строк:
``` sql
INSERT INTO table SELECT file('a.txt'), file('b.txt');
```
**Смотрите также**
- [user_files_path](../../operations/server-configuration-parameters/settings.md#server_configuration_parameters-user_files_path)
- [file](../table-functions/file.md)

View File

@ -27,6 +27,11 @@ configure_file (Common/config.h.in ${CONFIG_COMMON})
configure_file (Common/config_version.h.in ${CONFIG_VERSION})
configure_file (Core/config_core.h.in ${CMAKE_CURRENT_BINARY_DIR}/Core/include/config_core.h)
if (USE_DEBUG_HELPERS)
set (INCLUDE_DEBUG_HELPERS "-I${ClickHouse_SOURCE_DIR}/base -include ${ClickHouse_SOURCE_DIR}/src/Core/iostream_debug_helpers.h")
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${INCLUDE_DEBUG_HELPERS}")
endif ()
if (COMPILER_GCC)
# If we leave this optimization enabled, gcc-7 replaces a pair of SSE intrinsics (16 byte load, store) with a call to memcpy.
# It leads to slow code. This is compiler bug. It looks like this:

View File

@ -78,7 +78,10 @@ String Macros::expand(const String & s,
/// Prefer explicit macros over implicit.
if (it != macros.end() && !info.expand_special_macros_only)
{
res += it->second;
info.expanded_other = true;
}
else if (macro_name == "database" && !info.table_id.database_name.empty())
{
res += info.table_id.database_name;

View File

@ -40,6 +40,7 @@ public:
bool expanded_database = false;
bool expanded_table = false;
bool expanded_uuid = false;
bool expanded_other = false;
bool has_unknown = false;
};

View File

@ -141,7 +141,7 @@ class IColumn;
M(UInt64, optimize_min_equality_disjunction_chain_length, 3, "The minimum length of the expression `expr = x1 OR ... expr = xN` for optimization ", 0) \
\
M(UInt64, min_bytes_to_use_direct_io, 0, "The minimum number of bytes for reading the data with O_DIRECT option during SELECT queries execution. 0 - disabled.", 0) \
M(UInt64, min_bytes_to_use_mmap_io, 0, "The minimum number of bytes for reading the data with mmap option during SELECT queries execution. 0 - disabled.", 0) \
M(UInt64, min_bytes_to_use_mmap_io, (64 * 1024 * 1024), "The minimum number of bytes for reading the data with mmap option during SELECT queries execution. 0 - disabled.", 0) \
M(Bool, checksum_on_read, true, "Validate checksums on reading. It is enabled by default and should be always enabled in production. Please do not expect any benefits in disabling this setting. It may only be used for experiments and benchmarks. The setting only applicable for tables of MergeTree family. Checksums are always validated for other table engines and when receiving data over network.", 0) \
\
M(Bool, force_index_by_date, 0, "Throw an exception if there is a partition key in a table, and it is not used.", 0) \

View File

@ -23,6 +23,7 @@
#include <Parsers/parseQuery.h>
#include <Interpreters/InterpreterCreateQuery.h>
#include <Parsers/formatAST.h>
#include <Common/Macros.h>
namespace DB
{
@ -309,20 +310,66 @@ void DatabaseReplicated::loadStoredObjects(Context & context, bool has_force_res
ddl_worker->startup();
}
BlockIO DatabaseReplicated::tryEnqueueReplicatedDDL(const ASTPtr & query, const Context & query_context)
void DatabaseReplicated::checkQueryValid(const ASTPtr & query, const Context & query_context) const
{
if (is_readonly)
throw Exception(ErrorCodes::NO_ZOOKEEPER, "Database is in readonly mode, because it cannot connect to ZooKeeper");
if (query_context.getClientInfo().query_kind != ClientInfo::QueryKind::INITIAL_QUERY)
throw Exception(ErrorCodes::INCORRECT_QUERY, "It's not initial query. ON CLUSTER is not allowed for Replicated database.");
/// Replicas will set correct name of current database in query context (database name can be different on replicas)
if (auto * ddl_query = query->as<ASTQueryWithTableAndOutput>())
if (auto * ddl_query = dynamic_cast<ASTQueryWithTableAndOutput *>(query.get()))
{
if (ddl_query->database != getDatabaseName())
throw Exception(ErrorCodes::UNKNOWN_DATABASE, "Database was renamed");
ddl_query->database.clear();
if (auto * create = query->as<ASTCreateQuery>())
{
bool replicated_table = create->storage && create->storage->engine && startsWith(create->storage->engine->name, "Replicated");
if (!replicated_table || !create->storage->engine->arguments)
return;
ASTs & args = create->storage->engine->arguments->children;
if (args.size() < 2)
return;
ASTLiteral * arg1 = args[0]->as<ASTLiteral>();
ASTLiteral * arg2 = args[1]->as<ASTLiteral>();
if (!arg1 || !arg2 || arg1->value.getType() != Field::Types::String || arg2->value.getType() != Field::Types::String)
return;
String maybe_path = arg1->value.get<String>();
String maybe_replica = arg2->value.get<String>();
/// Looks like it's ReplicatedMergeTree with explicit zookeeper_path and replica_name arguments.
/// Let's ensure that some macros are used.
/// NOTE: we cannot check here that substituted values will be actually different on shards and replicas.
Macros::MacroExpansionInfo info;
info.table_id = {getDatabaseName(), create->table, create->uuid};
query_context.getMacros()->expand(maybe_path, info);
bool maybe_shard_macros = info.expanded_other;
info.expanded_other = false;
query_context.getMacros()->expand(maybe_replica, info);
bool maybe_replica_macros = info.expanded_other;
bool enable_functional_tests_helper = global_context.getConfigRef().has("_functional_tests_helper_database_replicated_replace_args_macros");
if (!enable_functional_tests_helper)
LOG_WARNING(log, "It's not recommended to explicitly specify zookeeper_path and replica_name in ReplicatedMergeTree arguments");
if (maybe_shard_macros && maybe_replica_macros)
return;
if (enable_functional_tests_helper)
{
if (maybe_path.empty() || maybe_path.back() != '/')
maybe_path += '/';
arg1->value = maybe_path + "auto_{shard}";
arg2->value = maybe_replica + "auto_{replica}";
return;
}
throw Exception(ErrorCodes::INCORRECT_QUERY,
"Explicit zookeeper_path and replica_name are specified in ReplicatedMergeTree arguments. "
"If you really want to specify it explicitly, then you should use some macros "
"to distinguish different shards and replicas");
}
}
if (const auto * query_alter = query->as<ASTAlterQuery>())
@ -343,7 +390,17 @@ BlockIO DatabaseReplicated::tryEnqueueReplicatedDDL(const ASTPtr & query, const
"Use DETACH TABLE PERMANENTLY or SYSTEM RESTART REPLICA or set "
"database_replicated_always_detach_permanently to 1");
}
}
BlockIO DatabaseReplicated::tryEnqueueReplicatedDDL(const ASTPtr & query, const Context & query_context)
{
if (is_readonly)
throw Exception(ErrorCodes::NO_ZOOKEEPER, "Database is in readonly mode, because it cannot connect to ZooKeeper");
if (query_context.getClientInfo().query_kind != ClientInfo::QueryKind::INITIAL_QUERY)
throw Exception(ErrorCodes::INCORRECT_QUERY, "It's not initial query. ON CLUSTER is not allowed for Replicated database.");
checkQueryValid(query, query_context);
LOG_DEBUG(log, "Proposing query: {}", queryToString(query));
DDLLogEntry entry;

View File

@ -71,6 +71,8 @@ private:
bool createDatabaseNodesInZooKeeper(const ZooKeeperPtr & current_zookeeper);
void createReplicaNodesInZooKeeper(const ZooKeeperPtr & current_zookeeper);
void checkQueryValid(const ASTPtr & query, const Context & query_context) const;
void recoverLostReplica(const ZooKeeperPtr & current_zookeeper, UInt32 our_log_ptr, UInt32 max_log_ptr);
std::map<String, String> tryGetConsistentMetadataSnapshot(const ZooKeeperPtr & zookeeper, UInt32 & max_log_ptr);

View File

@ -125,7 +125,7 @@ String DatabaseReplicatedDDLWorker::tryEnqueueAndExecuteEntry(DDLLogEntry & entr
});
if (!processed)
throw Exception(ErrorCodes::UNFINISHED, "Timeout: Cannot enqueue query on this replica,"
throw Exception(ErrorCodes::UNFINISHED, "Timeout: Cannot enqueue query on this replica, "
"most likely because replica is busy with previous queue entries");
}

View File

@ -648,7 +648,7 @@ void DiskS3::moveFile(const String & from_path, const String & to_path)
if (send_metadata)
{
auto revision = ++revision_counter;
const DiskS3::ObjectMetadata object_metadata {
const ObjectMetadata object_metadata {
{"from_path", from_path},
{"to_path", to_path}
};
@ -942,20 +942,32 @@ void DiskS3::startup()
LOG_INFO(&Poco::Logger::get("DiskS3"), "Starting up disk {}", name);
/// Find last revision.
if (readSchemaVersion(bucket, s3_root_path) < RESTORABLE_SCHEMA_VERSION)
migrateToRestorableSchema();
findLastRevision();
LOG_INFO(&Poco::Logger::get("DiskS3"), "Disk {} started up", name);
}
void DiskS3::findLastRevision()
{
UInt64 l = 0, r = LATEST_REVISION;
while (l < r)
{
LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Check revision in bounds {}-{}", l, r);
auto revision = l + (r - l + 1) / 2;
if (revision == 0)
break;
auto revision_str = revisionToString(revision);
LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Check object with revision {}", revision);
/// Check file or operation with such revision exists.
if (checkObjectExists(s3_root_path + "r" + revision_str)
|| checkObjectExists(s3_root_path + "operations/r" + revision_str))
if (checkObjectExists(bucket, s3_root_path + "r" + revision_str)
|| checkObjectExists(bucket, s3_root_path + "operations/r" + revision_str))
l = revision;
else
r = revision - 1;
@ -964,10 +976,127 @@ void DiskS3::startup()
LOG_INFO(&Poco::Logger::get("DiskS3"), "Found last revision number {} for disk {}", revision_counter, name);
}
bool DiskS3::checkObjectExists(const String & prefix)
int DiskS3::readSchemaVersion(const String & source_bucket, const String & source_path)
{
int version = 0;
if (!checkObjectExists(source_bucket, source_path + SCHEMA_VERSION_OBJECT))
return version;
ReadBufferFromS3 buffer (client, source_bucket, source_path + SCHEMA_VERSION_OBJECT);
readIntText(version, buffer);
return version;
}
void DiskS3::saveSchemaVersion(const int & version)
{
WriteBufferFromS3 buffer (client, bucket, s3_root_path + SCHEMA_VERSION_OBJECT, min_upload_part_size, max_single_part_upload_size);
writeIntText(version, buffer);
buffer.finalize();
}
void DiskS3::updateObjectMetadata(const String & key, const ObjectMetadata & metadata)
{
Aws::S3::Model::CopyObjectRequest request;
request.SetCopySource(bucket + "/" + key);
request.SetBucket(bucket);
request.SetKey(key);
request.SetMetadata(metadata);
request.SetMetadataDirective(Aws::S3::Model::MetadataDirective::REPLACE);
auto outcome = client->CopyObject(request);
throwIfError(outcome);
}
void DiskS3::migrateFileToRestorableSchema(const String & path)
{
LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Migrate file {} to restorable schema", metadata_path + path);
auto meta = readMeta(path);
for (const auto & [key, _] : meta.s3_objects)
{
ObjectMetadata metadata {
{"path", path}
};
updateObjectMetadata(s3_root_path + key, metadata);
}
}
void DiskS3::migrateToRestorableSchemaRecursive(const String & path, Futures & results)
{
checkStackSize(); /// This is needed to prevent stack overflow in case of cyclic symlinks.
LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Migrate directory {} to restorable schema", metadata_path + path);
bool dir_contains_only_files = true;
for (auto it = iterateDirectory(path); it->isValid(); it->next())
if (isDirectory(it->path()))
{
dir_contains_only_files = false;
break;
}
/// The whole directory can be migrated asynchronously.
if (dir_contains_only_files)
{
auto result = getExecutor().execute([this, path]
{
for (auto it = iterateDirectory(path); it->isValid(); it->next())
migrateFileToRestorableSchema(it->path());
});
results.push_back(std::move(result));
}
else
{
for (auto it = iterateDirectory(path); it->isValid(); it->next())
if (!isDirectory(it->path()))
{
auto source_path = it->path();
auto result = getExecutor().execute([this, source_path]
{
migrateFileToRestorableSchema(source_path);
});
results.push_back(std::move(result));
}
else
migrateToRestorableSchemaRecursive(it->path(), results);
}
}
void DiskS3::migrateToRestorableSchema()
{
try
{
LOG_INFO(&Poco::Logger::get("DiskS3"), "Start migration to restorable schema for disk {}", name);
Futures results;
for (const auto & root : data_roots)
if (exists(root))
migrateToRestorableSchemaRecursive(root + '/', results);
for (auto & result : results)
result.wait();
for (auto & result : results)
result.get();
saveSchemaVersion(RESTORABLE_SCHEMA_VERSION);
}
catch (const Exception & e)
{
LOG_ERROR(&Poco::Logger::get("DiskS3"), "Failed to migrate to restorable schema. Code: {}, e.displayText() = {}, Stack trace:\n\n{}", e.code(), e.displayText(), e.getStackTraceString());
throw;
}
}
bool DiskS3::checkObjectExists(const String & source_bucket, const String & prefix)
{
Aws::S3::Model::ListObjectsV2Request request;
request.SetBucket(bucket);
request.SetBucket(source_bucket);
request.SetPrefix(prefix);
request.SetMaxKeys(1);
@ -1048,7 +1177,7 @@ struct DiskS3::RestoreInformation
void DiskS3::readRestoreInformation(DiskS3::RestoreInformation & restore_information)
{
ReadBufferFromFile buffer(metadata_path + restore_file_name, 512);
ReadBufferFromFile buffer(metadata_path + RESTORE_FILE_NAME, 512);
buffer.next();
/// Empty file - just restore all metadata.
@ -1083,7 +1212,7 @@ void DiskS3::readRestoreInformation(DiskS3::RestoreInformation & restore_informa
void DiskS3::restore()
{
if (!exists(restore_file_name))
if (!exists(RESTORE_FILE_NAME))
return;
try
@ -1110,17 +1239,27 @@ void DiskS3::restore()
throw Exception("Restoring to the same bucket is allowed only if source path is not a sub-path of configured path in S3 disk", ErrorCodes::BAD_ARGUMENTS);
}
///TODO: Cleanup FS and bucket if previous restore was failed.
if (readSchemaVersion(information.source_bucket, information.source_path) < RESTORABLE_SCHEMA_VERSION)
throw Exception("Source bucket doesn't have restorable schema.", ErrorCodes::BAD_ARGUMENTS);
LOG_INFO(&Poco::Logger::get("DiskS3"), "Starting to restore disk {}. Revision: {}, Source bucket: {}, Source path: {}",
name, information.revision, information.source_bucket, information.source_path);
LOG_INFO(&Poco::Logger::get("DiskS3"), "Removing old metadata...");
bool cleanup_s3 = information.source_bucket != bucket || information.source_path != s3_root_path;
for (const auto & root : data_roots)
if (exists(root))
removeSharedRecursive(root + '/', !cleanup_s3);
restoreFiles(information.source_bucket, information.source_path, information.revision);
restoreFileOperations(information.source_bucket, information.source_path, information.revision);
Poco::File restore_file(metadata_path + restore_file_name);
Poco::File restore_file(metadata_path + RESTORE_FILE_NAME);
restore_file.remove();
saveSchemaVersion(RESTORABLE_SCHEMA_VERSION);
LOG_INFO(&Poco::Logger::get("DiskS3"), "Restore disk {} finished", name);
}
catch (const Exception & e)
@ -1186,7 +1325,11 @@ void DiskS3::processRestoreFiles(const String & source_bucket, const String & so
/// Restore file if object has 'path' in metadata.
auto path_entry = object_metadata.find("path");
if (path_entry == object_metadata.end())
throw Exception("Failed to restore key " + key + " because it doesn't have 'path' in metadata", ErrorCodes::S3_ERROR);
{
/// Such keys can remain after migration, we can skip them.
LOG_WARNING(&Poco::Logger::get("DiskS3"), "Skip key {} because it doesn't have 'path' in metadata", key);
continue;
}
const auto & path = path_entry->second;

View File

@ -25,6 +25,7 @@ class DiskS3 : public IDisk
{
public:
using ObjectMetadata = std::map<std::string, std::string>;
using Futures = std::vector<std::future<void>>;
friend class DiskS3Reservation;
@ -149,7 +150,16 @@ private:
void createFileOperationObject(const String & operation_name, UInt64 revision, const ObjectMetadata & metadata);
static String revisionToString(UInt64 revision);
bool checkObjectExists(const String & prefix);
bool checkObjectExists(const String & source_bucket, const String & prefix);
void findLastRevision();
int readSchemaVersion(const String & source_bucket, const String & source_path);
void saveSchemaVersion(const int & version);
void updateObjectMetadata(const String & key, const ObjectMetadata & metadata);
void migrateFileToRestorableSchema(const String & path);
void migrateToRestorableSchemaRecursive(const String & path, Futures & results);
void migrateToRestorableSchema();
Aws::S3::Model::HeadObjectResult headObject(const String & source_bucket, const String & key);
void listObjects(const String & source_bucket, const String & source_path, std::function<bool(const Aws::S3::Model::ListObjectsV2Result &)> callback);
void copyObject(const String & src_bucket, const String & src_key, const String & dst_bucket, const String & dst_key);
@ -169,7 +179,7 @@ private:
std::shared_ptr<S3::ProxyConfiguration> proxy_configuration;
const String bucket;
const String s3_root_path;
const String metadata_path;
String metadata_path;
size_t min_upload_part_size;
size_t max_single_part_upload_size;
size_t min_bytes_for_seek;
@ -180,16 +190,23 @@ private:
std::mutex reservation_mutex;
std::atomic<UInt64> revision_counter;
static constexpr UInt64 LATEST_REVISION = (static_cast<UInt64>(1)) << 63;
static constexpr UInt64 LATEST_REVISION = std::numeric_limits<UInt64>::max();
static constexpr UInt64 UNKNOWN_REVISION = 0;
/// File at path {metadata_path}/restore contains metadata restore information
const String restore_file_name = "restore";
inline static const String RESTORE_FILE_NAME = "restore";
/// The number of keys listed in one request (1000 is max value)
int list_object_keys_size;
/// Key has format: ../../r{revision}-{operation}
const re2::RE2 key_regexp {".*/r(\\d+)-(\\w+).*"};
/// Object contains information about schema version.
inline static const String SCHEMA_VERSION_OBJECT = ".SCHEMA_VERSION";
/// Version with possibility to backup-restore metadata.
static constexpr int RESTORABLE_SCHEMA_VERSION = 1;
/// Directories with data.
const std::vector<String> data_roots {"data", "store"};
};
}

View File

@ -22,6 +22,11 @@ namespace ErrorCodes
}
/** Generates array
* range(size): [0, size)
* range(start, end): [start, end)
* range(start, end, step): [start, end) with step increments.
*/
class FunctionRange : public IFunction
{
public:
@ -40,9 +45,9 @@ private:
{
if (arguments.size() > 3 || arguments.empty())
{
throw Exception{"Function " + getName() + " needs 1..3 arguments; passed "
+ std::to_string(arguments.size()) + ".",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH};
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Function {} needs 1..3 arguments; passed {}.",
getName(), arguments.size());
}
for (const auto & arg : arguments)
@ -339,6 +344,18 @@ private:
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count) const override
{
DataTypePtr elem_type = checkAndGetDataType<DataTypeArray>(result_type.get())->getNestedType();
WhichDataType which(elem_type);
if (!which.isUInt8()
&& !which.isUInt16()
&& !which.isUInt32()
&& !which.isUInt64())
{
throw Exception{"Illegal columns of arguments of function " + getName()
+ ", the function only implemented for unsigned integers up to 64 bit", ErrorCodes::ILLEGAL_COLUMN};
}
ColumnPtr res;
if (arguments.size() == 1)
{
@ -356,22 +373,24 @@ private:
Columns columns_holder(3);
ColumnRawPtrs column_ptrs(3);
const auto return_type = checkAndGetDataType<DataTypeArray>(result_type.get())->getNestedType();
for (size_t i = 0; i < arguments.size(); ++i)
{
if (i == 1)
columns_holder[i] = castColumn(arguments[i], return_type)->convertToFullColumnIfConst();
columns_holder[i] = castColumn(arguments[i], elem_type)->convertToFullColumnIfConst();
else
columns_holder[i] = castColumn(arguments[i], return_type);
columns_holder[i] = castColumn(arguments[i], elem_type);
column_ptrs[i] = columns_holder[i].get();
}
// for step column, defaults to 1
/// Step is one by default.
if (arguments.size() == 2)
{
columns_holder[2] = return_type->createColumnConst(input_rows_count, 1);
/// Convert a column with constant 1 to the result type.
columns_holder[2] = castColumn(
{DataTypeUInt8().createColumnConst(input_rows_count, 1), std::make_shared<DataTypeUInt8>(), {}},
elem_type);
column_ptrs[2] = columns_holder[2].get();
}
@ -385,7 +404,9 @@ private:
if ((res = executeConstStartStep<UInt8>(column_ptrs[1], start, step, input_rows_count)) ||
(res = executeConstStartStep<UInt16>(column_ptrs[1], start, step, input_rows_count)) ||
(res = executeConstStartStep<UInt32>(column_ptrs[1], start, step, input_rows_count)) ||
(res = executeConstStartStep<UInt64>(column_ptrs[1], start, step, input_rows_count))) {}
(res = executeConstStartStep<UInt64>(column_ptrs[1], start, step, input_rows_count)))
{
}
}
else if (is_start_const && !is_step_const)
{
@ -394,7 +415,9 @@ private:
if ((res = executeConstStart<UInt8>(column_ptrs[1], column_ptrs[2], start, input_rows_count)) ||
(res = executeConstStart<UInt16>(column_ptrs[1], column_ptrs[2], start, input_rows_count)) ||
(res = executeConstStart<UInt32>(column_ptrs[1], column_ptrs[2], start, input_rows_count)) ||
(res = executeConstStart<UInt64>(column_ptrs[1], column_ptrs[2], start, input_rows_count))) {}
(res = executeConstStart<UInt64>(column_ptrs[1], column_ptrs[2], start, input_rows_count)))
{
}
}
else if (!is_start_const && is_step_const)
{
@ -403,14 +426,18 @@ private:
if ((res = executeConstStep<UInt8>(column_ptrs[0], column_ptrs[1], step, input_rows_count)) ||
(res = executeConstStep<UInt16>(column_ptrs[0], column_ptrs[1], step, input_rows_count)) ||
(res = executeConstStep<UInt32>(column_ptrs[0], column_ptrs[1], step, input_rows_count)) ||
(res = executeConstStep<UInt64>(column_ptrs[0], column_ptrs[1], step, input_rows_count))) {}
(res = executeConstStep<UInt64>(column_ptrs[0], column_ptrs[1], step, input_rows_count)))
{
}
}
else
{
if ((res = executeGeneric<UInt8>(column_ptrs[0], column_ptrs[1], column_ptrs[2], input_rows_count)) ||
(res = executeGeneric<UInt16>(column_ptrs[0], column_ptrs[1], column_ptrs[2], input_rows_count)) ||
(res = executeGeneric<UInt32>(column_ptrs[0], column_ptrs[1], column_ptrs[2], input_rows_count)) ||
(res = executeGeneric<UInt64>(column_ptrs[0], column_ptrs[1], column_ptrs[2], input_rows_count))) {}
(res = executeGeneric<UInt64>(column_ptrs[0], column_ptrs[1], column_ptrs[2], input_rows_count)))
{
}
}
if (!res)

View File

@ -7,15 +7,15 @@
namespace DB
{
/// Get the connection ID. It's used for MySQL handler only.
class FunctionConnectionID : public IFunction
/// Get the connection Id. It's used for MySQL handler only.
class FunctionConnectionId : public IFunction
{
public:
static constexpr auto name = "connectionID";
static constexpr auto name = "connectionId";
explicit FunctionConnectionID(const Context & context_) : context(context_) {}
explicit FunctionConnectionId(const Context & context_) : context(context_) {}
static FunctionPtr create(const Context & context) { return std::make_shared<FunctionConnectionID>(context); }
static FunctionPtr create(const Context & context) { return std::make_shared<FunctionConnectionId>(context); }
String getName() const override { return name; }
@ -32,9 +32,9 @@ private:
const Context & context;
};
void registerFunctionConnectionID(FunctionFactory & factory)
void registerFunctionConnectionId(FunctionFactory & factory)
{
factory.registerFunction<FunctionConnectionID>(FunctionFactory::CaseInsensitive);
factory.registerFunction<FunctionConnectionId>(FunctionFactory::CaseInsensitive);
factory.registerAlias("connection_id", "connectionID", FunctionFactory::CaseInsensitive);
}

View File

@ -0,0 +1,70 @@
#include <memory>
#include <Columns/ColumnString.h>
#include <Core/Block.h>
#include <DataTypes/DataTypeString.h>
#include <Functions/FunctionFactory.h>
#include <Functions/IFunctionImpl.h>
#include <Storages/MergeTree/MergeTreePartition.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
/** partitionId(x, y, ...) is a function that computes partition ids of arguments.
* The function is slow and should not be called for large amount of rows.
*/
class FunctionPartitionId : public IFunction
{
public:
static constexpr auto name = "partitionId";
static FunctionPtr create(const Context &) { return std::make_shared<FunctionPartitionId>(); }
String getName() const override { return name; }
bool isVariadic() const override { return true; }
size_t getNumberOfArguments() const override { return 0; }
bool isInjective(const ColumnsWithTypeAndName & /*sample_columns*/) const override { return true; }
bool useDefaultImplementationForNulls() const override { return true; }
bool useDefaultImplementationForConstants() const override { return true; }
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
{
if (arguments.empty())
throw Exception("Function " + getName() + " requires at least one argument.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
return std::make_shared<DataTypeString>();
}
virtual ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t input_rows_count) const override
{
Block sample_block(arguments);
size_t size = arguments.size();
auto result_column = ColumnString::create();
for (size_t j = 0; j < input_rows_count; ++j)
{
Row row(size);
for (size_t i = 0; i < size; ++i)
arguments[i].column->get(j, row[i]);
MergeTreePartition partition(std::move(row));
result_column->insert(partition.getID(sample_block));
}
return result_column;
}
};
void registerFunctionPartitionId(FunctionFactory & factory)
{
factory.registerFunction<FunctionPartitionId>();
}
}

View File

@ -70,7 +70,8 @@ void registerFunctionErrorCodeToName(FunctionFactory &);
void registerFunctionTcpPort(FunctionFactory &);
void registerFunctionByteSize(FunctionFactory &);
void registerFunctionFile(FunctionFactory & factory);
void registerFunctionConnectionID(FunctionFactory & factory);
void registerFunctionConnectionId(FunctionFactory & factory);
void registerFunctionPartitionId(FunctionFactory & factory);
void registerFunctionIsIPAddressContainedIn(FunctionFactory &);
#if USE_ICU
@ -142,7 +143,8 @@ void registerFunctionsMiscellaneous(FunctionFactory & factory)
registerFunctionTcpPort(factory);
registerFunctionByteSize(factory);
registerFunctionFile(factory);
registerFunctionConnectionID(factory);
registerFunctionConnectionId(factory);
registerFunctionPartitionId(factory);
registerFunctionIsIPAddressContainedIn(factory);
#if USE_ICU

View File

@ -210,7 +210,7 @@ SRCS(
cbrt.cpp
coalesce.cpp
concat.cpp
connectionID.cpp
connectionId.cpp
convertCharset.cpp
cos.cpp
cosh.cpp
@ -374,6 +374,7 @@ SRCS(
now.cpp
now64.cpp
nullIf.cpp
partitionId.cpp
pi.cpp
plus.cpp
pointInEllipses.cpp

View File

@ -344,6 +344,17 @@ String DatabaseReplicatedTask::getShardID() const
return database->shard_name;
}
void DatabaseReplicatedTask::parseQueryFromEntry(const Context & context)
{
DDLTaskBase::parseQueryFromEntry(context);
if (auto * ddl_query = dynamic_cast<ASTQueryWithTableAndOutput *>(query.get()))
{
/// Update database name with actual name of local database
assert(ddl_query->database.empty());
ddl_query->database = database->getDatabaseName();
}
}
std::unique_ptr<Context> DatabaseReplicatedTask::makeQueryContext(Context & from_context, const ZooKeeperPtr & zookeeper)
{
auto query_context = DDLTaskBase::makeQueryContext(from_context, zookeeper);

View File

@ -93,7 +93,7 @@ struct DDLTaskBase
DDLTaskBase(const DDLTaskBase &) = delete;
virtual ~DDLTaskBase() = default;
void parseQueryFromEntry(const Context & context);
virtual void parseQueryFromEntry(const Context & context);
virtual String getShardID() const = 0;
@ -134,6 +134,7 @@ struct DatabaseReplicatedTask : public DDLTaskBase
DatabaseReplicatedTask(const String & name, const String & path, DatabaseReplicated * database_);
String getShardID() const override;
void parseQueryFromEntry(const Context & context) override;
std::unique_ptr<Context> makeQueryContext(Context & from_context, const ZooKeeperPtr & zookeeper) override;
DatabaseReplicated * database;

View File

@ -201,7 +201,7 @@ void ExecuteScalarSubqueriesMatcher::visit(const ASTSubquery & subquery, ASTPtr
void ExecuteScalarSubqueriesMatcher::visit(const ASTFunction & func, ASTPtr & ast, Data & data)
{
/// Don't descend into subqueries in arguments of IN operator.
/// But if an argument is not subquery, than deeper may be scalar subqueries and we need to descend in them.
/// But if an argument is not subquery, then deeper may be scalar subqueries and we need to descend in them.
std::vector<ASTPtr *> out;
if (checkFunctionIsInOrGlobalInOperator(func))

View File

@ -299,7 +299,7 @@ void ExpressionAnalyzer::initGlobalSubqueriesAndExternalTables(bool do_global)
}
void SelectQueryExpressionAnalyzer::tryMakeSetForIndexFromSubquery(const ASTPtr & subquery_or_table_name)
void ExpressionAnalyzer::tryMakeSetForIndexFromSubquery(const ASTPtr & subquery_or_table_name, const SelectQueryOptions & query_options)
{
auto set_key = PreparedSetKey::forSubquery(*subquery_or_table_name);
@ -335,7 +335,7 @@ void SelectQueryExpressionAnalyzer::tryMakeSetForIndexFromSubquery(const ASTPtr
prepared_sets[set_key] = std::move(set);
}
SetPtr SelectQueryExpressionAnalyzer::isPlainStorageSetInSubquery(const ASTPtr & subquery_or_table_name)
SetPtr ExpressionAnalyzer::isPlainStorageSetInSubquery(const ASTPtr & subquery_or_table_name)
{
const auto * table = subquery_or_table_name->as<ASTIdentifier>();
if (!table)
@ -381,7 +381,7 @@ void SelectQueryExpressionAnalyzer::makeSetsForIndex(const ASTPtr & node)
if (arg->as<ASTSubquery>() || arg->as<ASTIdentifier>())
{
if (settings.use_index_for_in_with_subqueries)
tryMakeSetForIndexFromSubquery(arg);
tryMakeSetForIndexFromSubquery(arg, query_options);
}
else
{
@ -1334,9 +1334,9 @@ ExpressionActionsPtr ExpressionAnalyzer::getActions(bool add_aliases, bool proje
}
ExpressionActionsPtr ExpressionAnalyzer::getConstActions()
ExpressionActionsPtr ExpressionAnalyzer::getConstActions(const ColumnsWithTypeAndName & constant_inputs)
{
auto actions = std::make_shared<ActionsDAG>(NamesAndTypesList());
auto actions = std::make_shared<ActionsDAG>(constant_inputs);
getRootActions(query, true, actions, true);
return std::make_shared<ExpressionActions>(actions, ExpressionActionsSettings::fromContext(context));

View File

@ -111,7 +111,7 @@ public:
/// Actions that can be performed on an empty block: adding constants and applying functions that depend only on constants.
/// Does not execute subqueries.
ExpressionActionsPtr getConstActions();
ExpressionActionsPtr getConstActions(const ColumnsWithTypeAndName & constant_inputs = {});
/** Sets that require a subquery to be create.
* Only the sets needed to perform actions returned from already executed `append*` or `getActions`.
@ -128,6 +128,19 @@ public:
void makeWindowDescriptions(ActionsDAGPtr actions);
/**
* Create Set from a subquery or a table expression in the query. The created set is suitable for using the index.
* The set will not be created if its size hits the limit.
*/
void tryMakeSetForIndexFromSubquery(const ASTPtr & subquery_or_table_name, const SelectQueryOptions & query_options = {});
/**
* Checks if subquery is not a plain StorageSet.
* Because while making set we will read data from StorageSet which is not allowed.
* Returns valid SetPtr from StorageSet if the latter is used after IN or nullptr otherwise.
*/
SetPtr isPlainStorageSetInSubquery(const ASTPtr & subquery_or_table_name);
protected:
ExpressionAnalyzer(
const ASTPtr & query_,
@ -299,19 +312,6 @@ private:
NameSet required_result_columns;
SelectQueryOptions query_options;
/**
* Create Set from a subquery or a table expression in the query. The created set is suitable for using the index.
* The set will not be created if its size hits the limit.
*/
void tryMakeSetForIndexFromSubquery(const ASTPtr & subquery_or_table_name);
/**
* Checks if subquery is not a plain StorageSet.
* Because while making set we will read data from StorageSet which is not allowed.
* Returns valid SetPtr from StorageSet if the latter is used after IN or nullptr otherwise.
*/
SetPtr isPlainStorageSetInSubquery(const ASTPtr & subquery_or_table_name);
/// Create Set-s that we make from IN section to use index on them.
void makeSetsForIndex(const ASTPtr & node);

View File

@ -48,6 +48,7 @@ BlockIO InterpreterAlterQuery::execute()
context.checkAccess(getRequiredAccess());
auto table_id = context.resolveStorageID(alter, Context::ResolveOrdinary);
query_ptr->as<ASTAlterQuery &>().database = table_id.database_name;
DatabasePtr database = DatabaseCatalog::instance().getDatabase(table_id.database_name);
if (typeid_cast<DatabaseReplicated *>(database.get()) && context.getClientInfo().query_kind != ClientInfo::QueryKind::SECONDARY_QUERY)

View File

@ -82,7 +82,7 @@ void InterpreterDropQuery::waitForTableToBeActuallyDroppedOrDetached(const ASTDr
db->waitDetachedTableNotInUse(uuid_to_wait);
}
BlockIO InterpreterDropQuery::executeToTable(const ASTDropQuery & query)
BlockIO InterpreterDropQuery::executeToTable(ASTDropQuery & query)
{
DatabasePtr database;
UUID table_to_wait_on = UUIDHelpers::Nil;
@ -92,7 +92,7 @@ BlockIO InterpreterDropQuery::executeToTable(const ASTDropQuery & query)
return res;
}
BlockIO InterpreterDropQuery::executeToTableImpl(const ASTDropQuery & query, DatabasePtr & db, UUID & uuid_to_wait)
BlockIO InterpreterDropQuery::executeToTableImpl(ASTDropQuery & query, DatabasePtr & db, UUID & uuid_to_wait)
{
/// NOTE: it does not contain UUID, we will resolve it with locked DDLGuard
auto table_id = StorageID(query);
@ -101,7 +101,7 @@ BlockIO InterpreterDropQuery::executeToTableImpl(const ASTDropQuery & query, Dat
if (context.tryResolveStorageID(table_id, Context::ResolveExternal))
return executeToTemporaryTable(table_id.getTableName(), query.kind);
else
table_id.database_name = context.getCurrentDatabase();
query.database = table_id.database_name = context.getCurrentDatabase();
}
if (query.temporary)
@ -212,7 +212,7 @@ BlockIO InterpreterDropQuery::executeToDictionary(
String database_name = context.resolveDatabase(database_name_);
auto ddl_guard = (!no_ddl_lock ? DatabaseCatalog::instance().getDDLGuard(database_name, dictionary_name) : nullptr);
query_ptr->as<ASTDropQuery>()->database = database_name;
DatabasePtr database = tryGetDatabase(database_name, if_exists);
bool is_drop_or_detach_database = query_ptr->as<ASTDropQuery>()->table.empty();

View File

@ -34,8 +34,8 @@ private:
BlockIO executeToDatabase(const ASTDropQuery & query);
BlockIO executeToDatabaseImpl(const ASTDropQuery & query, DatabasePtr & database, std::vector<UUID> & uuids_to_wait);
BlockIO executeToTable(const ASTDropQuery & query);
BlockIO executeToTableImpl(const ASTDropQuery & query, DatabasePtr & db, UUID & uuid_to_wait);
BlockIO executeToTable(ASTDropQuery & query);
BlockIO executeToTableImpl(ASTDropQuery & query, DatabasePtr & db, UUID & uuid_to_wait);
static void waitForTableToBeActuallyDroppedOrDetached(const ASTDropQuery & query, const DatabasePtr & db, const UUID & uuid_to_wait);

View File

@ -662,7 +662,10 @@ void TreeRewriterResult::collectUsedColumns(const ASTPtr & query, bool is_select
const auto & partition_desc = metadata_snapshot->getPartitionKey();
if (partition_desc.expression)
{
const auto & partition_source_columns = partition_desc.expression->getRequiredColumns();
auto partition_source_columns = partition_desc.expression->getRequiredColumns();
partition_source_columns.push_back("_part");
partition_source_columns.push_back("_partition_id");
partition_source_columns.push_back("_part_uuid");
optimize_trivial_count = true;
for (const auto & required_column : required)
{

View File

@ -42,6 +42,7 @@
#include <Storages/MergeTree/localBackup.h>
#include <Storages/StorageMergeTree.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/VirtualColumnUtils.h>
#include <Common/Increment.h>
#include <Common/SimpleIncrement.h>
#include <Common/Stopwatch.h>
@ -681,6 +682,41 @@ void MergeTreeData::MergingParams::check(const StorageInMemoryMetadata & metadat
}
std::optional<UInt64> MergeTreeData::totalRowsByPartitionPredicateImpl(
const SelectQueryInfo & query_info, const Context & context, const DataPartsVector & parts) const
{
auto metadata_snapshot = getInMemoryMetadataPtr();
ASTPtr expression_ast;
Block virtual_columns_block = MergeTreeDataSelectExecutor::getSampleBlockWithVirtualPartColumns();
// Generate valid expressions for filtering
bool valid = VirtualColumnUtils::prepareFilterBlockWithQuery(query_info.query, context, virtual_columns_block, expression_ast);
PartitionPruner partition_pruner(metadata_snapshot->getPartitionKey(), query_info, context, true /* strict */);
if (partition_pruner.isUseless() && !valid)
return {};
std::unordered_set<String> part_values;
if (valid && expression_ast)
{
MergeTreeDataSelectExecutor::fillBlockWithVirtualPartColumns(parts, virtual_columns_block);
VirtualColumnUtils::filterBlockWithQuery(query_info.query, virtual_columns_block, context, expression_ast);
part_values = VirtualColumnUtils::extractSingleValueFromBlock<String>(virtual_columns_block, "_part");
if (part_values.empty())
return 0;
}
// At this point, empty `part_values` means all parts.
size_t res = 0;
for (const auto & part : parts)
{
if ((part_values.empty() || part_values.find(part->name) != part_values.end()) && !partition_pruner.canBePruned(part))
res += part->rows_count;
}
return res;
}
String MergeTreeData::MergingParams::getModeName() const
{
switch (mode)

View File

@ -893,6 +893,9 @@ protected:
return {begin, end};
}
std::optional<UInt64> totalRowsByPartitionPredicateImpl(
const SelectQueryInfo & query_info, const Context & context, const DataPartsVector & parts) const;
static decltype(auto) getStateModifier(DataPartState state)
{
return [state] (const DataPartPtr & part) { part->setState(state); };

View File

@ -39,6 +39,7 @@
#include <DataTypes/DataTypeUUID.h>
#include <DataTypes/DataTypesNumber.h>
#include <Storages/VirtualColumnUtils.h>
#include <DataStreams/materializeBlock.h>
namespace ProfileEvents
{
@ -71,28 +72,30 @@ MergeTreeDataSelectExecutor::MergeTreeDataSelectExecutor(const MergeTreeData & d
}
/// Construct a block consisting only of possible values of virtual columns
static Block getBlockWithVirtualPartColumns(const MergeTreeData::DataPartsVector & parts, bool with_uuid)
Block MergeTreeDataSelectExecutor::getSampleBlockWithVirtualPartColumns()
{
auto part_column = ColumnString::create();
auto part_uuid_column = ColumnUUID::create();
return Block(std::initializer_list<ColumnWithTypeAndName>{
ColumnWithTypeAndName(ColumnString::create(), std::make_shared<DataTypeString>(), "_part"),
ColumnWithTypeAndName(ColumnString::create(), std::make_shared<DataTypeString>(), "_partition_id"),
ColumnWithTypeAndName(ColumnUUID::create(), std::make_shared<DataTypeUUID>(), "_part_uuid")});
}
void MergeTreeDataSelectExecutor::fillBlockWithVirtualPartColumns(const MergeTreeData::DataPartsVector & parts, Block & block)
{
MutableColumns columns = block.mutateColumns();
auto & part_column = columns[0];
auto & partition_id_column = columns[1];
auto & part_uuid_column = columns[2];
for (const auto & part : parts)
{
part_column->insert(part->name);
if (with_uuid)
part_uuid_column->insert(part->uuid);
partition_id_column->insert(part->info.partition_id);
part_uuid_column->insert(part->uuid);
}
if (with_uuid)
{
return Block(std::initializer_list<ColumnWithTypeAndName>{
ColumnWithTypeAndName(std::move(part_column), std::make_shared<DataTypeString>(), "_part"),
ColumnWithTypeAndName(std::move(part_uuid_column), std::make_shared<DataTypeUUID>(), "_part_uuid"),
});
}
return Block{ColumnWithTypeAndName(std::move(part_column), std::make_shared<DataTypeString>(), "_part")};
block.setColumns(std::move(columns));
}
@ -176,8 +179,6 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts(
Names real_column_names;
size_t total_parts = parts.size();
bool part_column_queried = false;
bool part_uuid_column_queried = false;
bool sample_factor_column_queried = false;
Float64 used_sample_factor = 1;
@ -186,7 +187,6 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts(
{
if (name == "_part")
{
part_column_queried = true;
virt_column_names.push_back(name);
}
else if (name == "_part_index")
@ -199,7 +199,6 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts(
}
else if (name == "_part_uuid")
{
part_uuid_column_queried = true;
virt_column_names.push_back(name);
}
else if (name == "_sample_factor")
@ -219,12 +218,23 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts(
if (real_column_names.empty())
real_column_names.push_back(ExpressionActions::getSmallestColumn(available_real_columns));
/// If `_part` or `_part_uuid` virtual columns are requested, we try to filter out data by them.
Block virtual_columns_block = getBlockWithVirtualPartColumns(parts, part_uuid_column_queried);
if (part_column_queried || part_uuid_column_queried)
VirtualColumnUtils::filterBlockWithQuery(query_info.query, virtual_columns_block, context);
std::unordered_set<String> part_values;
ASTPtr expression_ast;
auto virtual_columns_block = getSampleBlockWithVirtualPartColumns();
auto part_values = VirtualColumnUtils::extractSingleValueFromBlock<String>(virtual_columns_block, "_part");
// Generate valid expressions for filtering
VirtualColumnUtils::prepareFilterBlockWithQuery(query_info.query, context, virtual_columns_block, expression_ast);
// If there is still something left, fill the virtual block and do the filtering.
if (expression_ast)
{
fillBlockWithVirtualPartColumns(parts, virtual_columns_block);
VirtualColumnUtils::filterBlockWithQuery(query_info.query, virtual_columns_block, context, expression_ast);
part_values = VirtualColumnUtils::extractSingleValueFromBlock<String>(virtual_columns_block, "_part");
if (part_values.empty())
return std::make_unique<QueryPlan>();
}
// At this point, empty `part_values` means all parts.
metadata_snapshot->check(real_column_names, data.getVirtuals(), data.getStorageID());
@ -373,7 +383,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts(
{
LOG_DEBUG(log, "Will use no data on this replica because parallel replicas processing has been requested"
" (the setting 'max_parallel_replicas') but the table does not support sampling and this replica is not the first.");
return {};
return std::make_unique<QueryPlan>();
}
bool use_sampling = relative_sample_size > 0 || (settings.parallel_replicas_count > 1 && data.supportsSampling());
@ -1894,7 +1904,7 @@ void MergeTreeDataSelectExecutor::selectPartsToRead(
for (const auto & part : prev_parts)
{
if (part_values.find(part->name) == part_values.end())
if (!part_values.empty() && part_values.find(part->name) == part_values.end())
continue;
if (part->isEmpty())
@ -1945,7 +1955,7 @@ void MergeTreeDataSelectExecutor::selectPartsToReadWithUUIDFilter(
for (const auto & part : prev_parts)
{
if (part_values.find(part->name) == part_values.end())
if (!part_values.empty() && part_values.find(part->name) == part_values.end())
continue;
if (part->isEmpty())

View File

@ -44,6 +44,12 @@ public:
unsigned num_streams,
const PartitionIdToMaxBlock * max_block_numbers_to_read = nullptr) const;
/// Construct a sample block consisting only of possible virtual columns for part pruning.
static Block getSampleBlockWithVirtualPartColumns();
/// Fill in values of possible virtual columns for part pruning.
static void fillBlockWithVirtualPartColumns(const MergeTreeData::DataPartsVector & parts, Block & block);
private:
const MergeTreeData & data;

View File

@ -20,6 +20,7 @@
#include <AggregateFunctions/parseAggregateFunctionParameters.h>
#include <Interpreters/Context.h>
#include <Interpreters/evaluateConstantExpression.h>
namespace DB
@ -410,6 +411,35 @@ static StoragePtr create(const StorageFactory::Arguments & args)
throw Exception(msg, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
}
if (is_extended_storage_def)
{
/// Allow expressions in engine arguments.
/// In new syntax argument can be literal or identifier or array/tuple of identifiers.
size_t arg_idx = 0;
try
{
for (; arg_idx < engine_args.size(); ++arg_idx)
{
auto & arg = engine_args[arg_idx];
auto * arg_func = arg->as<ASTFunction>();
if (!arg_func)
continue;
/// If we got ASTFunction, let's evaluate it and replace with ASTLiteral.
/// Do not try evaluate array or tuple, because it's array or tuple of column identifiers.
if (arg_func->name == "array" || arg_func->name == "tuple")
continue;
Field value = evaluateConstantExpression(arg, args.local_context).first;
arg = std::make_shared<ASTLiteral>(value);
}
}
catch (Exception & e)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot evaluate engine argument {}: {} {}",
arg_idx, e.message(), getMergeTreeVerboseHelp(is_extended_storage_def));
}
}
/// For Replicated.
String zookeeper_path;
String replica_name;

View File

@ -219,8 +219,8 @@ Pipe StorageMerge::read(
/** First we make list of selected tables to find out its size.
* This is necessary to correctly pass the recommended number of threads to each table.
*/
StorageListWithLocks selected_tables = getSelectedTables(
query_info.query, has_table_virtual_column, context.getCurrentQueryId(), context.getSettingsRef());
StorageListWithLocks selected_tables
= getSelectedTables(query_info, has_table_virtual_column, context.getCurrentQueryId(), context.getSettingsRef());
if (selected_tables.empty())
/// FIXME: do we support sampling in this case?
@ -409,8 +409,9 @@ StorageMerge::StorageListWithLocks StorageMerge::getSelectedTables(const String
StorageMerge::StorageListWithLocks StorageMerge::getSelectedTables(
const ASTPtr & query, bool has_virtual_column, const String & query_id, const Settings & settings) const
const SelectQueryInfo & query_info, bool has_virtual_column, const String & query_id, const Settings & settings) const
{
const ASTPtr & query = query_info.query;
StorageListWithLocks selected_tables;
DatabaseTablesIteratorPtr iterator = getDatabaseIterator(global_context);
@ -438,7 +439,7 @@ StorageMerge::StorageListWithLocks StorageMerge::getSelectedTables(
if (has_virtual_column)
{
Block virtual_columns_block = Block{ColumnWithTypeAndName(std::move(virtual_column), std::make_shared<DataTypeString>(), "_table")};
VirtualColumnUtils::filterBlockWithQuery(query, virtual_columns_block, global_context);
VirtualColumnUtils::filterBlockWithQuery(query_info.query, virtual_columns_block, global_context);
auto values = VirtualColumnUtils::extractSingleValueFromBlock<String>(virtual_columns_block, "_table");
/// Remove unused tables from the list

View File

@ -59,7 +59,7 @@ private:
StorageListWithLocks getSelectedTables(const String & query_id, const Settings & settings) const;
StorageMerge::StorageListWithLocks getSelectedTables(
const ASTPtr & query, bool has_virtual_column, const String & query_id, const Settings & settings) const;
const SelectQueryInfo & query_info, bool has_virtual_column, const String & query_id, const Settings & settings) const;
template <typename F>
StoragePtr getFirstTable(F && predicate) const;

View File

@ -212,18 +212,8 @@ std::optional<UInt64> StorageMergeTree::totalRows(const Settings &) const
std::optional<UInt64> StorageMergeTree::totalRowsByPartitionPredicate(const SelectQueryInfo & query_info, const Context & context) const
{
auto metadata_snapshot = getInMemoryMetadataPtr();
PartitionPruner partition_pruner(metadata_snapshot->getPartitionKey(), query_info, context, true /* strict */);
if (partition_pruner.isUseless())
return {};
size_t res = 0;
auto lock = lockParts();
for (const auto & part : getDataPartsStateRange(DataPartState::Committed))
{
if (!partition_pruner.canBePruned(part))
res += part->rows_count;
}
return res;
auto parts = getDataPartsVector({DataPartState::Committed});
return totalRowsByPartitionPredicateImpl(query_info, context, parts);
}
std::optional<UInt64> StorageMergeTree::totalBytes(const Settings &) const

View File

@ -4116,17 +4116,9 @@ std::optional<UInt64> StorageReplicatedMergeTree::totalRows(const Settings & set
std::optional<UInt64> StorageReplicatedMergeTree::totalRowsByPartitionPredicate(const SelectQueryInfo & query_info, const Context & context) const
{
auto metadata_snapshot = getInMemoryMetadataPtr();
PartitionPruner partition_pruner(metadata_snapshot->getPartitionKey(), query_info, context, true /* strict */);
if (partition_pruner.isUseless())
return {};
size_t res = 0;
foreachCommittedParts([&](auto & part)
{
if (!partition_pruner.canBePruned(part))
res += part->rows_count;
}, context.getSettingsRef().select_sequential_consistency);
return res;
DataPartsVector parts;
foreachCommittedParts([&](auto & part) { parts.push_back(part); }, context.getSettingsRef().select_sequential_consistency);
return totalRowsByPartitionPredicateImpl(query_info, context, parts);
}
std::optional<UInt64> StorageReplicatedMergeTree::totalBytes(const Settings & settings) const

View File

@ -62,7 +62,7 @@ StorageSystemTables::StorageSystemTables(const StorageID & table_id_)
}
static ColumnPtr getFilteredDatabases(const ASTPtr & query, const Context & context)
static ColumnPtr getFilteredDatabases(const SelectQueryInfo & query_info, const Context & context)
{
MutableColumnPtr column = ColumnString::create();
@ -76,7 +76,7 @@ static ColumnPtr getFilteredDatabases(const ASTPtr & query, const Context & cont
}
Block block { ColumnWithTypeAndName(std::move(column), std::make_shared<DataTypeString>(), "database") };
VirtualColumnUtils::filterBlockWithQuery(query, block, context);
VirtualColumnUtils::filterBlockWithQuery(query_info.query, block, context);
return block.getByPosition(0).column;
}
@ -525,7 +525,7 @@ Pipe StorageSystemTables::read(
}
}
ColumnPtr filtered_databases_column = getFilteredDatabases(query_info.query, context);
ColumnPtr filtered_databases_column = getFilteredDatabases(query_info, context);
return Pipe(std::make_shared<TablesBlockSource>(
std::move(columns_mask), std::move(res_block), max_block_size, std::move(filtered_databases_column), context));

View File

@ -5,12 +5,14 @@
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/IdentifierSemantic.h>
#include <Interpreters/misc.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTSubquery.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnsCommon.h>
@ -19,6 +21,7 @@
#include <Storages/VirtualColumnUtils.h>
#include <IO/WriteHelpers.h>
#include <Common/typeid_cast.h>
#include <Interpreters/ActionsVisitor.h>
namespace DB
@ -28,31 +31,36 @@ namespace
{
/// Verifying that the function depends only on the specified columns
bool isValidFunction(const ASTPtr & expression, const NameSet & columns)
bool isValidFunction(const ASTPtr & expression, const std::function<bool(const ASTPtr &)> & is_constant)
{
for (const auto & child : expression->children)
if (!isValidFunction(child, columns))
return false;
if (auto opt_name = IdentifierSemantic::getColumnName(expression))
return columns.count(*opt_name);
return true;
const auto * function = expression->as<ASTFunction>();
if (function && functionIsInOrGlobalInOperator(function->name))
{
// Second argument of IN can be a scalar subquery
return isValidFunction(function->arguments->children[0], is_constant);
}
else
return is_constant(expression);
}
/// Extract all subfunctions of the main conjunction, but depending only on the specified columns
void extractFunctions(const ASTPtr & expression, const NameSet & columns, std::vector<ASTPtr> & result)
bool extractFunctions(const ASTPtr & expression, const std::function<bool(const ASTPtr &)> & is_constant, std::vector<ASTPtr> & result)
{
const auto * function = expression->as<ASTFunction>();
if (function && function->name == "and")
if (function && (function->name == "and" || function->name == "indexHint"))
{
bool ret = true;
for (const auto & child : function->arguments->children)
extractFunctions(child, columns, result);
ret &= extractFunctions(child, is_constant, result);
return ret;
}
else if (isValidFunction(expression, columns))
else if (isValidFunction(expression, is_constant))
{
result.push_back(expression->clone());
return true;
}
else
return false;
}
/// Construct a conjunction from given functions
@ -65,6 +73,25 @@ ASTPtr buildWhereExpression(const ASTs & functions)
return makeASTFunction("and", functions);
}
void buildSets(const ASTPtr & expression, ExpressionAnalyzer & analyzer)
{
const auto * func = expression->as<ASTFunction>();
if (func && functionIsInOrGlobalInOperator(func->name))
{
const IAST & args = *func->arguments;
const ASTPtr & arg = args.children.at(1);
if (arg->as<ASTSubquery>() || arg->as<ASTIdentifier>())
{
analyzer.tryMakeSetForIndexFromSubquery(arg);
}
}
else
{
for (const auto & child : expression->children)
buildSets(child, analyzer);
}
}
}
namespace VirtualColumnUtils
@ -76,7 +103,6 @@ void rewriteEntityInAst(ASTPtr ast, const String & column_name, const Field & va
if (!select.with())
select.setExpression(ASTSelectQuery::Expression::WITH, std::make_shared<ASTExpressionList>());
if (func.empty())
{
auto literal = std::make_shared<ASTLiteral>(value);
@ -96,30 +122,63 @@ void rewriteEntityInAst(ASTPtr ast, const String & column_name, const Field & va
}
}
void filterBlockWithQuery(const ASTPtr & query, Block & block, const Context & context)
bool prepareFilterBlockWithQuery(const ASTPtr & query, const Context & context, Block block, ASTPtr & expression_ast)
{
bool unmodified = true;
const auto & select = query->as<ASTSelectQuery &>();
if (!select.where() && !select.prewhere())
return;
return unmodified;
NameSet columns;
for (const auto & it : block.getNamesAndTypesList())
columns.insert(it.name);
ASTPtr condition_ast;
if (select.prewhere() && select.where())
condition_ast = makeASTFunction("and", select.prewhere()->clone(), select.where()->clone());
else
condition_ast = select.prewhere() ? select.prewhere()->clone() : select.where()->clone();
/// We will create an expression that evaluates the expressions in WHERE and PREWHERE, depending only on the existing columns.
// Prepare a constant block with valid expressions
for (size_t i = 0; i < block.columns(); ++i)
block.getByPosition(i).column = block.getByPosition(i).type->createColumnConstWithDefaultValue(1);
// Provide input columns as constant columns to check if an expression is constant.
std::function<bool(const ASTPtr &)> is_constant = [&block, &context](const ASTPtr & node)
{
auto actions = std::make_shared<ActionsDAG>(block.getColumnsWithTypeAndName());
PreparedSets prepared_sets;
SubqueriesForSets subqueries_for_sets;
ActionsVisitor::Data visitor_data(
context, SizeLimits{}, 1, {}, std::move(actions), prepared_sets, subqueries_for_sets, true, true, true, false);
ActionsVisitor(visitor_data).visit(node);
actions = visitor_data.getActions();
auto expression_actions = std::make_shared<ExpressionActions>(actions);
auto block_with_constants = block;
expression_actions->execute(block_with_constants);
auto column_name = node->getColumnName();
return block_with_constants.has(column_name) && isColumnConst(*block_with_constants.getByName(column_name).column);
};
/// Create an expression that evaluates the expressions in WHERE and PREWHERE, depending only on the existing columns.
std::vector<ASTPtr> functions;
if (select.where())
extractFunctions(select.where(), columns, functions);
unmodified &= extractFunctions(select.where(), is_constant, functions);
if (select.prewhere())
extractFunctions(select.prewhere(), columns, functions);
unmodified &= extractFunctions(select.prewhere(), is_constant, functions);
expression_ast = buildWhereExpression(functions);
return unmodified;
}
void filterBlockWithQuery(const ASTPtr & query, Block & block, const Context & context, ASTPtr expression_ast)
{
if (!expression_ast)
prepareFilterBlockWithQuery(query, context, block, expression_ast);
ASTPtr expression_ast = buildWhereExpression(functions);
if (!expression_ast)
return;
/// Let's analyze and calculate the expression.
/// Let's analyze and calculate the prepared expression.
auto syntax_result = TreeRewriter(context).analyze(expression_ast, block.getNamesAndTypesList());
ExpressionAnalyzer analyzer(expression_ast, syntax_result, context);
buildSets(expression_ast, analyzer);
ExpressionActionsPtr actions = analyzer.getActions(false);
Block block_with_filter = block;

View File

@ -4,6 +4,7 @@
#include <Core/Block.h>
#include <Parsers/IAST_fwd.h>
#include <Storages/SelectQueryInfo.h>
namespace DB
@ -24,9 +25,14 @@ namespace VirtualColumnUtils
/// - `WITH toUInt16(9000) as _port`.
void rewriteEntityInAst(ASTPtr ast, const String & column_name, const Field & value, const String & func = "");
/// Prepare `expression_ast` to filter block. Returns true if `expression_ast` is not trimmed, that is,
/// `block` provides all needed columns for `expression_ast`, else return false.
bool prepareFilterBlockWithQuery(const ASTPtr & query, const Context & context, Block block, ASTPtr & expression_ast);
/// Leave in the block only the rows that fit under the WHERE clause and the PREWHERE clause of the query.
/// Only elements of the outer conjunction are considered, depending only on the columns present in the block.
void filterBlockWithQuery(const ASTPtr & query, Block & block, const Context & context);
/// If `expression_ast` is passed, use it to filter block.
void filterBlockWithQuery(const ASTPtr & query, Block & block, const Context & context, ASTPtr expression_ast = {});
/// Extract from the input stream a set of `name` column values
template <typename T>

View File

@ -108,7 +108,7 @@ def remove_control_characters(s):
def get_db_engine(args, database_name):
if args.replicated_database:
return " ENGINE=Replicated('/test/clickhouse/db/{}', 's1', 'r1')".format(database_name)
return " ON CLUSTER test_cluster_database_replicated ENGINE=Replicated('/test/clickhouse/db/{}', '{{shard}}', '{{replica}}')".format(database_name)
if args.db_engine:
return " ENGINE=" + args.db_engine
return "" # Will use default engine
@ -180,7 +180,10 @@ def run_single_test(args, ext, server_logs_level, client_options, case_file, std
clickhouse_proc_create = Popen(shlex.split(args.client), stdin=PIPE, stdout=PIPE, stderr=PIPE, universal_newlines=True)
seconds_left = max(args.timeout - (datetime.now() - start_time).total_seconds(), 20)
try:
clickhouse_proc_create.communicate(("DROP DATABASE " + database), timeout=seconds_left)
drop_database_query = "DROP DATABASE " + database
if args.replicated_database:
drop_database_query += " ON CLUSTER test_cluster_database_replicated"
clickhouse_proc_create.communicate((drop_database_query), timeout=seconds_left)
except TimeoutExpired:
# kill test process because it can also hung
if proc.returncode is None:
@ -201,6 +204,9 @@ def run_single_test(args, ext, server_logs_level, client_options, case_file, std
os.system("LC_ALL=C sed -i -e 's/{test_db}/default/g' {file}".format(test_db=database, file=stdout_file))
if not args.show_db_name:
os.system("LC_ALL=C sed -i -e 's/{test_db}/default/g' {file}".format(test_db=database, file=stderr_file))
if args.replicated_database:
os.system("LC_ALL=C sed -i -e 's|/auto_{{shard}}||g' {file}".format(file=stdout_file))
os.system("LC_ALL=C sed -i -e 's|auto_{{replica}}||g' {file}".format(file=stdout_file))
stdout = open(stdout_file, 'rb').read() if os.path.exists(stdout_file) else b''
stdout = str(stdout, errors='replace', encoding='utf-8')
@ -216,8 +222,12 @@ def need_retry(stderr):
def get_processlist(args):
try:
query = b"SHOW PROCESSLIST FORMAT Vertical"
if args.replicated_database:
query = b"SELECT materialize((hostName(), tcpPort())) as host, * " \
b"FROM clusterAllReplicas('r', system.processes) WHERE query NOT LIKE '%system.processes%' FORMAT Vertical"
clickhouse_proc = Popen(shlex.split(args.client), stdin=PIPE, stdout=PIPE, stderr=PIPE)
(stdout, _) = clickhouse_proc.communicate((b"SHOW PROCESSLIST FORMAT Vertical"), timeout=20)
(stdout, _) = clickhouse_proc.communicate((query), timeout=20)
return False, stdout.decode('utf-8')
except Exception as ex:
print("Exception", ex)
@ -849,6 +859,8 @@ def main(args):
if total_tests_run == 0:
print("No tests were run.")
sys.exit(1)
else:
print("All tests have finished.")
sys.exit(exit_code)

View File

@ -0,0 +1,77 @@
<yandex>
<zookeeper>
<node index="1">
<host>localhost</host>
<port>9181</port>
</node>
<node index="2">
<host>localhost</host>
<port>19181</port>
</node>
<node index="3">
<host>localhost</host>
<port>29181</port>
</node>
</zookeeper>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>1</server_id>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
<force_sync>false</force_sync>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>localhost</hostname>
<port>44444</port>
<can_become_leader>true</can_become_leader>
<priority>3</priority>
</server>
<server>
<id>2</id>
<hostname>localhost</hostname>
<port>44445</port>
<can_become_leader>true</can_become_leader>
<start_as_follower>true</start_as_follower>
<priority>2</priority>
</server>
<server>
<id>3</id>
<hostname>localhost</hostname>
<port>44446</port>
<can_become_leader>true</can_become_leader>
<start_as_follower>true</start_as_follower>
<priority>1</priority>
</server>
</raft_configuration>
</keeper_server>
<remote_servers>
<test_cluster_database_replicated>
<shard>
<replica>
<host>localhost</host>
<port>9000</port>
</replica>
<replica>
<host>localhost</host>
<port>19000</port>
</replica>
</shard>
<shard>
<replica>
<host>localhost</host>
<port>29000</port>
</replica>
</shard>
</test_cluster_database_replicated>
</remote_servers>
<_functional_tests_helper_database_replicated_replace_args_macros>1</_functional_tests_helper_database_replicated_replace_args_macros>
</yandex>

View File

@ -65,6 +65,31 @@ if [[ -n "$USE_DATABASE_ORDINARY" ]] && [[ "$USE_DATABASE_ORDINARY" -eq 1 ]]; th
fi
if [[ -n "$USE_DATABASE_REPLICATED" ]] && [[ "$USE_DATABASE_REPLICATED" -eq 1 ]]; then
ln -sf $SRC_PATH/users.d/database_replicated.xml $DEST_SERVER_PATH/users.d/
ln -sf $SRC_PATH/config.d/database_replicated.xml $DEST_SERVER_PATH/config.d/
rm /etc/clickhouse-server/config.d/zookeeper.xml
rm /etc/clickhouse-server/config.d/keeper_port.xml
# There is a bug in config reloading, so we cannot override macros using --macros.replica r2
# And we have to copy configs...
mkdir /etc/clickhouse-server1
mkdir /etc/clickhouse-server2
chown clickhouse /etc/clickhouse-server1
chown clickhouse /etc/clickhouse-server2
chgrp clickhouse /etc/clickhouse-server1
chgrp clickhouse /etc/clickhouse-server2
sudo -u clickhouse cp -r /etc/clickhouse-server/* /etc/clickhouse-server1
sudo -u clickhouse cp -r /etc/clickhouse-server/* /etc/clickhouse-server2
rm /etc/clickhouse-server1/config.d/macros.xml
rm /etc/clickhouse-server2/config.d/macros.xml
sudo -u clickhouse cat /etc/clickhouse-server/config.d/macros.xml | sed "s|<replica>r1</replica>|<replica>r2</replica>|" > /etc/clickhouse-server1/config.d/macros.xml
sudo -u clickhouse cat /etc/clickhouse-server/config.d/macros.xml | sed "s|<shard>s1</shard>|<shard>s2</shard>|" > /etc/clickhouse-server2/config.d/macros.xml
sudo mkdir /var/lib/clickhouse1
sudo mkdir /var/lib/clickhouse2
sudo chown clickhouse /var/lib/clickhouse1
sudo chown clickhouse /var/lib/clickhouse2
sudo chgrp clickhouse /var/lib/clickhouse1
sudo chgrp clickhouse /var/lib/clickhouse2
fi
ln -sf $SRC_PATH/client_config.xml $DEST_CLIENT_PATH/config.xml

View File

@ -0,0 +1,42 @@
<yandex>
<max_concurrent_queries>1000</max_concurrent_queries>
<remote_servers>
<one_shard>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>node1_r1</host>
<port>9000</port>
</replica>
<replica>
<host>node1_r2</host>
<port>9000</port>
</replica>
</shard>
</one_shard>
<two_shards>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>node1_r1</host>
<port>9000</port>
</replica>
<replica>
<host>node1_r2</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>node2_r1</host>
<port>9000</port>
</replica>
<replica>
<host>node2_r2</host>
<port>9000</port>
</replica>
</shard>
</two_shards>
</remote_servers>
</yandex>

View File

@ -0,0 +1,103 @@
# pylint: disable=redefined-outer-name
# pylint: disable=unused-argument
# pylint: disable=line-too-long
import shlex
import itertools
import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node1_r1 = cluster.add_instance('node1_r1', main_configs=['configs/remote_servers.xml'])
node2_r1 = cluster.add_instance('node2_r1', main_configs=['configs/remote_servers.xml'])
node1_r2 = cluster.add_instance('node1_r2', main_configs=['configs/remote_servers.xml'])
node2_r2 = cluster.add_instance('node2_r2', main_configs=['configs/remote_servers.xml'])
def run_benchmark(payload, settings):
node1_r1.exec_in_container([
'bash', '-c', 'echo {} | '.format(shlex.quote(payload.strip())) + ' '.join([
'clickhouse', 'benchmark',
'--concurrency=100',
'--cumulative',
'--delay=0',
# NOTE: with current matrix even 3 seconds it huge...
'--timelimit=3',
# tune some basic timeouts
'--hedged_connection_timeout_ms=200',
'--connect_timeout_with_failover_ms=200',
'--connections_with_failover_max_tries=5',
*settings,
])
])
@pytest.fixture(scope='module')
def started_cluster():
try:
cluster.start()
for _, instance in cluster.instances.items():
instance.query("""
create table if not exists data (
key Int,
/* just to increase block size */
v1 UInt64,
v2 UInt64,
v3 UInt64,
v4 UInt64,
v5 UInt64,
v6 UInt64,
v7 UInt64,
v8 UInt64,
v9 UInt64,
v10 UInt64,
v11 UInt64,
v12 UInt64
) Engine=MergeTree() order by key partition by key%5;
insert into data (key) select * from numbers(10);
create table if not exists dist_one as data engine=Distributed(one_shard, currentDatabase(), data, key);
create table if not exists dist_one_over_dist as data engine=Distributed(one_shard, currentDatabase(), dist_one, yandexConsistentHash(key, 2));
create table if not exists dist_two as data engine=Distributed(two_shards, currentDatabase(), data, key);
create table if not exists dist_two_over_dist as data engine=Distributed(two_shards, currentDatabase(), dist_two, yandexConsistentHash(key, 2));
""")
yield cluster
finally:
cluster.shutdown()
# since it includes started_cluster fixture at first start
@pytest.mark.timeout(60)
@pytest.mark.parametrize('table,settings', itertools.product(
[ # tables
'dist_one',
'dist_one_over_dist',
'dist_two',
'dist_two_over_dist',
],
[ # settings
*list(itertools.combinations([
'', # defaults
'--prefer_localhost_replica=0',
'--async_socket_for_remote=0',
'--use_hedged_requests=0',
'--optimize_skip_unused_shards=1',
'--distributed_group_by_no_merge=2',
'--optimize_distributed_group_by_sharding_key=1',
# TODO: enlarge test matrix (but first those values to accept ms):
#
# - sleep_in_send_tables_status
# - sleep_in_send_data
], 2))
# TODO: more combinations that just 2
],
))
def test_stress_distributed(table, settings, started_cluster):
payload = f'''
select * from {table} where key = 0;
select * from {table} where key = 1;
select * from {table} where key = 2;
select * from {table} where key = 3;
select * from {table};
'''
run_benchmark(payload, settings)

View File

@ -42,7 +42,7 @@ class MySQLNodeInstance:
if not os.path.exists(self.instances_dir):
os.mkdir(self.instances_dir)
self.docker_logs_path = p.join(self.instances_dir, 'docker_mysql.log')
self.start_up = False
def alloc_connection(self):
if self.mysql_connection is None:
@ -78,12 +78,16 @@ class MySQLNodeInstance:
return cursor.fetchall()
def start_and_wait(self):
if self.start_up:
return
run_and_check(['docker-compose',
'-p', cluster.project_name,
'-f', self.docker_compose,
'up', '--no-recreate', '-d',
])
'-p', cluster.project_name,
'-f', self.docker_compose,
'up', '--no-recreate', '-d',
])
self.wait_mysql_to_start(120)
self.start_up = True
def close(self):
if self.mysql_connection is not None:
@ -99,6 +103,8 @@ class MySQLNodeInstance:
except Exception as e:
print("Unable to get logs from docker mysql.")
self.start_up = False
def wait_mysql_to_start(self, timeout=60):
start = time.time()
while time.time() - start < timeout:
@ -113,32 +119,32 @@ class MySQLNodeInstance:
run_and_check(['docker-compose', 'ps', '--services', 'all'])
raise Exception("Cannot wait MySQL container")
mysql_5_7_docker_compose = os.path.join(DOCKER_COMPOSE_PATH, 'docker_compose_mysql_5_7_for_materialize_mysql.yml')
mysql_5_7_node = MySQLNodeInstance('root', 'clickhouse', '127.0.0.1', 3308, mysql_5_7_docker_compose)
mysql_8_0_docker_compose = os.path.join(DOCKER_COMPOSE_PATH, 'docker_compose_mysql_8_0_for_materialize_mysql.yml')
mysql_8_0_node = MySQLNodeInstance('root', 'clickhouse', '127.0.0.1', 33308, mysql_8_0_docker_compose)
@pytest.fixture(scope="module")
def started_mysql_5_7():
docker_compose = os.path.join(DOCKER_COMPOSE_PATH, 'docker_compose_mysql_5_7_for_materialize_mysql.yml')
mysql_node = MySQLNodeInstance('root', 'clickhouse', '127.0.0.1', 3308, docker_compose)
try:
mysql_node.start_and_wait()
yield mysql_node
mysql_5_7_node.start_and_wait()
yield mysql_5_7_node
finally:
mysql_node.close()
run_and_check(['docker-compose', '-p', cluster.project_name, '-f', docker_compose, 'down', '--volumes',
'--remove-orphans'])
mysql_5_7_node.close()
run_and_check(['docker-compose', '-p', cluster.project_name, '-f', mysql_5_7_docker_compose, 'down', '--volumes', '--remove-orphans'])
@pytest.fixture(scope="module")
def started_mysql_8_0():
docker_compose = os.path.join(DOCKER_COMPOSE_PATH, 'docker_compose_mysql_8_0_for_materialize_mysql.yml')
mysql_node = MySQLNodeInstance('root', 'clickhouse', '127.0.0.1', 33308, docker_compose)
try:
mysql_node.start_and_wait()
yield mysql_node
mysql_8_0_node.start_and_wait()
yield mysql_8_0_node
finally:
mysql_node.close()
run_and_check(['docker-compose', '-p', cluster.project_name, '-f', docker_compose, 'down', '--volumes',
'--remove-orphans'])
mysql_8_0_node.close()
run_and_check(['docker-compose', '-p', cluster.project_name, '-f', mysql_8_0_docker_compose, 'down', '--volumes', '--remove-orphans'])
@pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_atomic])
@ -146,11 +152,13 @@ def test_materialize_database_dml_with_mysql_5_7(started_cluster, started_mysql_
materialize_with_ddl.dml_with_materialize_mysql_database(clickhouse_node, started_mysql_5_7, "mysql1")
materialize_with_ddl.materialize_mysql_database_with_datetime_and_decimal(clickhouse_node, started_mysql_5_7, "mysql1")
@pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_atomic])
def test_materialize_database_dml_with_mysql_8_0(started_cluster, started_mysql_8_0, clickhouse_node):
materialize_with_ddl.dml_with_materialize_mysql_database(clickhouse_node, started_mysql_8_0, "mysql8_0")
materialize_with_ddl.materialize_mysql_database_with_datetime_and_decimal(clickhouse_node, started_mysql_8_0, "mysql8_0")
@pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_atomic])
def test_materialize_database_ddl_with_mysql_5_7(started_cluster, started_mysql_5_7, clickhouse_node):
materialize_with_ddl.drop_table_with_materialize_mysql_database(clickhouse_node, started_mysql_5_7, "mysql1")
@ -163,6 +171,7 @@ def test_materialize_database_ddl_with_mysql_5_7(started_cluster, started_mysql_
materialize_with_ddl.alter_rename_table_with_materialize_mysql_database(clickhouse_node, started_mysql_5_7, "mysql1")
materialize_with_ddl.alter_modify_column_with_materialize_mysql_database(clickhouse_node, started_mysql_5_7, "mysql1")
@pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_atomic])
def test_materialize_database_ddl_with_mysql_8_0(started_cluster, started_mysql_8_0, clickhouse_node):
materialize_with_ddl.drop_table_with_materialize_mysql_database(clickhouse_node, started_mysql_8_0, "mysql8_0")
@ -179,10 +188,12 @@ def test_materialize_database_ddl_with_mysql_8_0(started_cluster, started_mysql_
materialize_with_ddl.alter_modify_column_with_materialize_mysql_database(clickhouse_node, started_mysql_8_0,
"mysql8_0")
@pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_atomic])
def test_materialize_database_ddl_with_empty_transaction_5_7(started_cluster, started_mysql_5_7, clickhouse_node):
materialize_with_ddl.query_event_with_empty_transaction(clickhouse_node, started_mysql_5_7, "mysql1")
@pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_atomic])
def test_materialize_database_ddl_with_empty_transaction_8_0(started_cluster, started_mysql_8_0, clickhouse_node):
materialize_with_ddl.query_event_with_empty_transaction(clickhouse_node, started_mysql_8_0, "mysql8_0")
@ -217,52 +228,64 @@ def test_materialize_database_err_sync_user_privs_5_7(started_cluster, started_m
def test_materialize_database_err_sync_user_privs_8_0(started_cluster, started_mysql_8_0, clickhouse_node):
materialize_with_ddl.err_sync_user_privs_with_materialize_mysql_database(clickhouse_node, started_mysql_8_0, "mysql8_0")
@pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_atomic])
def test_network_partition_5_7(started_cluster, started_mysql_5_7, clickhouse_node):
materialize_with_ddl.network_partition_test(clickhouse_node, started_mysql_5_7, "mysql1")
@pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_atomic])
def test_network_partition_8_0(started_cluster, started_mysql_8_0, clickhouse_node):
materialize_with_ddl.network_partition_test(clickhouse_node, started_mysql_8_0, "mysql8_0")
@pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_atomic])
def test_mysql_kill_sync_thread_restore_5_7(started_cluster, started_mysql_5_7, clickhouse_node):
materialize_with_ddl.mysql_kill_sync_thread_restore_test(clickhouse_node, started_mysql_5_7, "mysql1")
@pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_atomic])
def test_mysql_kill_sync_thread_restore_8_0(started_cluster, started_mysql_8_0, clickhouse_node):
materialize_with_ddl.mysql_kill_sync_thread_restore_test(clickhouse_node, started_mysql_8_0, "mysql8_0")
@pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_atomic])
def test_mysql_killed_while_insert_5_7(started_cluster, started_mysql_5_7, clickhouse_node):
materialize_with_ddl.mysql_killed_while_insert(clickhouse_node, started_mysql_5_7, "mysql1")
@pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_atomic])
def test_mysql_killed_while_insert_8_0(started_cluster, started_mysql_8_0, clickhouse_node):
materialize_with_ddl.mysql_killed_while_insert(clickhouse_node, started_mysql_8_0, "mysql8_0")
@pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_atomic])
def test_clickhouse_killed_while_insert_5_7(started_cluster, started_mysql_5_7, clickhouse_node):
materialize_with_ddl.clickhouse_killed_while_insert(clickhouse_node, started_mysql_5_7, "mysql1")
@pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_atomic])
def test_clickhouse_killed_while_insert_8_0(started_cluster, started_mysql_8_0, clickhouse_node):
materialize_with_ddl.clickhouse_killed_while_insert(clickhouse_node, started_mysql_8_0, "mysql8_0")
@pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_ordinary])
def test_utf8mb4(started_cluster, started_mysql_8_0, started_mysql_5_7, clickhouse_node):
materialize_with_ddl.utf8mb4_test(clickhouse_node, started_mysql_5_7, "mysql1")
materialize_with_ddl.utf8mb4_test(clickhouse_node, started_mysql_8_0, "mysql8_0")
@pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_ordinary])
def test_system_parts_table(started_cluster, started_mysql_8_0, clickhouse_node):
materialize_with_ddl.system_parts_test(clickhouse_node, started_mysql_8_0, "mysql8_0")
@pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_ordinary])
def test_multi_table_update(started_cluster, started_mysql_8_0, started_mysql_5_7, clickhouse_node):
materialize_with_ddl.multi_table_update_test(clickhouse_node, started_mysql_5_7, "mysql1")
materialize_with_ddl.multi_table_update_test(clickhouse_node, started_mysql_8_0, "mysql8_0")
@pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_ordinary])
def test_system_tables_table(started_cluster, started_mysql_8_0, clickhouse_node):
materialize_with_ddl.system_tables_test(clickhouse_node, started_mysql_8_0, "mysql8_0")

View File

@ -0,0 +1,35 @@
<yandex>
<storage_configuration>
<disks>
<s3>
<type>s3</type>
<endpoint>http://minio1:9001/root/another_data/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
<send_metadata>false</send_metadata>
<list_object_keys_size>1</list_object_keys_size> <!-- To effectively test restore parallelism -->
<retry_attempts>0</retry_attempts>
</s3>
<hdd>
<type>local</type>
<path>/</path>
</hdd>
</disks>
<policies>
<s3>
<volumes>
<main>
<disk>s3</disk>
</main>
<external>
<disk>hdd</disk>
</external>
</volumes>
</s3>
</policies>
</storage_configuration>
<merge_tree>
<min_bytes_for_wide_part>0</min_bytes_for_wide_part>
</merge_tree>
</yandex>

View File

@ -1,3 +1,4 @@
import os
import logging
import random
import string
@ -10,6 +11,20 @@ logging.getLogger().setLevel(logging.INFO)
logging.getLogger().addHandler(logging.StreamHandler())
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
CONFIG_PATH = os.path.join(SCRIPT_DIR, './_instances/node_not_restorable/configs/config.d/storage_conf_not_restorable.xml')
def replace_config(old, new):
config = open(CONFIG_PATH, 'r')
config_lines = config.readlines()
config.close()
config_lines = [line.replace(old, new) for line in config_lines]
config = open(CONFIG_PATH, 'w')
config.writelines(config_lines)
config.close()
@pytest.fixture(scope="module")
def cluster():
try:
@ -26,6 +41,10 @@ def cluster():
"configs/config.d/storage_conf_another_bucket_path.xml",
"configs/config.d/bg_processing_pool_conf.xml",
"configs/config.d/log_conf.xml"], user_configs=[], stay_alive=True)
cluster.add_instance("node_not_restorable", main_configs=[
"configs/config.d/storage_conf_not_restorable.xml",
"configs/config.d/bg_processing_pool_conf.xml",
"configs/config.d/log_conf.xml"], user_configs=[], stay_alive=True)
logging.info("Starting cluster...")
cluster.start()
logging.info("Cluster started")
@ -75,6 +94,8 @@ def create_table(node, table_name, additional_settings=None):
def purge_s3(cluster, bucket):
minio = cluster.minio_client
for obj in list(minio.list_objects(bucket, recursive=True)):
if str(obj.object_name).find(".SCHEMA_VERSION") != -1:
continue
minio.remove_object(bucket, obj.object_name)
@ -103,7 +124,7 @@ def get_revision_counter(node, backup_number):
def drop_table(cluster):
yield
node_names = ["node", "node_another_bucket", "node_another_bucket_path"]
node_names = ["node", "node_another_bucket", "node_another_bucket_path", "node_not_restorable"]
for node_name in node_names:
node = cluster.instances[node_name]
@ -311,3 +332,40 @@ def test_restore_mutations(cluster):
assert node_another_bucket.query("SELECT sum(id) FROM s3.test FORMAT Values") == "({})".format(0)
assert node_another_bucket.query("SELECT sum(counter) FROM s3.test FORMAT Values") == "({})".format(4096 * 2)
assert node_another_bucket.query("SELECT sum(counter) FROM s3.test WHERE id > 0 FORMAT Values") == "({})".format(4096)
def test_migrate_to_restorable_schema(cluster):
node = cluster.instances["node_not_restorable"]
create_table(node, "test")
node.query("INSERT INTO s3.test VALUES {}".format(generate_values('2020-01-03', 4096)))
node.query("INSERT INTO s3.test VALUES {}".format(generate_values('2020-01-04', 4096, -1)))
node.query("INSERT INTO s3.test VALUES {}".format(generate_values('2020-01-05', 4096)))
node.query("INSERT INTO s3.test VALUES {}".format(generate_values('2020-01-05', 4096, -1)))
replace_config("<send_metadata>false</send_metadata>", "<send_metadata>true</send_metadata>")
node.restart_clickhouse()
node.query("INSERT INTO s3.test VALUES {}".format(generate_values('2020-01-06', 4096)))
node.query("INSERT INTO s3.test VALUES {}".format(generate_values('2020-01-06', 4096, -1)))
node.query("ALTER TABLE s3.test FREEZE")
revision = get_revision_counter(node, 1)
assert revision != 0
node_another_bucket = cluster.instances["node_another_bucket"]
create_table(node_another_bucket, "test")
# Restore to revision before mutation.
node_another_bucket.stop_clickhouse()
drop_s3_metadata(node_another_bucket)
purge_s3(cluster, cluster.minio_bucket_2)
create_restore_file(node_another_bucket, revision=revision, bucket="root", path="another_data")
node_another_bucket.start_clickhouse(10)
assert node_another_bucket.query("SELECT count(*) FROM s3.test FORMAT Values") == "({})".format(4096 * 6)
assert node_another_bucket.query("SELECT sum(id) FROM s3.test FORMAT Values") == "({})".format(0)

View File

@ -35,8 +35,17 @@ def started_cluster():
cluster.shutdown()
def test_create_replicated_table(started_cluster):
assert "Explicit zookeeper_path and replica_name are specified" in \
main_node.query_and_get_error("CREATE TABLE testdb.replicated_table (d Date, k UInt64, i32 Int32) "
"ENGINE=ReplicatedMergeTree('/test/tmp', 'r') ORDER BY k PARTITION BY toYYYYMM(d);")
assert "Explicit zookeeper_path and replica_name are specified" in \
main_node.query_and_get_error("CREATE TABLE testdb.replicated_table (d Date, k UInt64, i32 Int32) "
"ENGINE=ReplicatedMergeTree('/test/tmp', 'r', d, k, 8192);")
assert "Old syntax is not allowed" in \
main_node.query_and_get_error("CREATE TABLE testdb.replicated_table (d Date, k UInt64, i32 Int32) ENGINE=ReplicatedMergeTree('/test/tmp', 'r', d, k, 8192);")
main_node.query_and_get_error("CREATE TABLE testdb.replicated_table (d Date, k UInt64, i32 Int32) "
"ENGINE=ReplicatedMergeTree('/test/tmp/{shard}', '{replica}', d, k, 8192);")
main_node.query("CREATE TABLE testdb.replicated_table (d Date, k UInt64, i32 Int32) ENGINE=ReplicatedMergeTree ORDER BY k PARTITION BY toYYYYMM(d);")

View File

@ -1,3 +1,3 @@
# Introduction to jepsen.nukeeper
# Introduction to jepsen.keeper
TODO: write [great documentation](http://jacobian.org/writing/what-to-write/)

View File

@ -1,4 +1,4 @@
(defproject jepsen.nukeeper "0.1.0-SNAPSHOT"
(defproject jepsen.keeper "0.1.0-SNAPSHOT"
:injections [(.. System (setProperty "zookeeper.request.timeout" "10000"))]
:description "A jepsen tests for ClickHouse Keeper"
:url "https://clickhouse.tech/"

View File

@ -15,7 +15,7 @@
(dorun (map (fn [v] (zk/delete conn v)) (take 10 (zk-range)))))
(deftest a-test
(testing "nukeeper connection"
(testing "keeper connection"
(.setLevel
(LoggerFactory/getLogger "org.apache.zookeeper") Level/OFF)
(let [conn (zk/connect "localhost:9181" :timeout-msec 5000)]

View File

@ -4,13 +4,13 @@ DROP TABLE IF EXISTS part_header_r2;
SET replication_alter_partitions_sync = 2;
CREATE TABLE part_header_r1(x UInt32, y UInt32)
ENGINE ReplicatedMergeTree('/clickhouse/tables/test_00814/part_header', '1') ORDER BY x
ENGINE ReplicatedMergeTree('/clickhouse/tables/'||currentDatabase()||'/test_00814/part_header/{shard}', '1{replica}') ORDER BY x
SETTINGS use_minimalistic_part_header_in_zookeeper = 0,
old_parts_lifetime = 1,
cleanup_delay_period = 0,
cleanup_delay_period_random_add = 0;
CREATE TABLE part_header_r2(x UInt32, y UInt32)
ENGINE ReplicatedMergeTree('/clickhouse/tables/test_00814/part_header', '2') ORDER BY x
ENGINE ReplicatedMergeTree('/clickhouse/tables/'||currentDatabase()||'/test_00814/part_header/{shard}', '2{replica}') ORDER BY x
SETTINGS use_minimalistic_part_header_in_zookeeper = 1,
old_parts_lifetime = 1,
cleanup_delay_period = 0,
@ -39,10 +39,10 @@ SELECT sleep(3) FORMAT Null;
SELECT '*** Test part removal ***';
SELECT '*** replica 1 ***';
SELECT name FROM system.parts WHERE active AND database = currentDatabase() AND table = 'part_header_r1';
SELECT name FROM system.zookeeper WHERE path = '/clickhouse/tables/test_00814/part_header/replicas/1/parts';
SELECT name FROM system.zookeeper WHERE path = '/clickhouse/tables/'||currentDatabase()||'/test_00814/part_header/s1/replicas/1r1/parts';
SELECT '*** replica 2 ***';
SELECT name FROM system.parts WHERE active AND database = currentDatabase() AND table = 'part_header_r2';
SELECT name FROM system.zookeeper WHERE path = '/clickhouse/tables/test_00814/part_header/replicas/1/parts';
SELECT name FROM system.zookeeper WHERE path = '/clickhouse/tables/'||currentDatabase()||'/test_00814/part_header/s1/replicas/1r1/parts';
SELECT '*** Test ALTER ***';
ALTER TABLE part_header_r1 MODIFY COLUMN y String;

View File

@ -15,7 +15,7 @@ CREATE TABLE elog (
engine_id UInt32,
referrer String
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/elog', 'test')
ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/elog/{shard}', '{replica}')
PARTITION BY date
ORDER BY (engine_id)
SETTINGS replicated_deduplication_window = 2, cleanup_delay_period=4, cleanup_delay_period_random_add=0;"
@ -28,35 +28,37 @@ $CLICKHOUSE_CLIENT --query="INSERT INTO elog VALUES (toDate('2018-10-01'), 3, 'h
$CLICKHOUSE_CLIENT --query="SELECT count(*) from elog" # 3 rows
count=$($CLICKHOUSE_CLIENT --query="SELECT COUNT(*) FROM system.zookeeper where path = '/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/elog/blocks'")
count=$($CLICKHOUSE_CLIENT --query="SELECT COUNT(*) FROM system.zookeeper where path = '/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/elog/s1/blocks'")
while [[ $count != 2 ]]
do
sleep 1
count=$($CLICKHOUSE_CLIENT --query="SELECT COUNT(*) FROM system.zookeeper where path = '/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/elog/blocks'")
count=$($CLICKHOUSE_CLIENT --query="SELECT COUNT(*) FROM system.zookeeper where path = '/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/elog/s1/blocks'")
done
$CLICKHOUSE_CLIENT --query="INSERT INTO elog VALUES (toDate('2018-10-01'), 1, 'hello')"
$CLICKHOUSE_CLIENT --query="SELECT count(*) from elog" # 4 rows
count=$($CLICKHOUSE_CLIENT --query="SELECT COUNT(*) FROM system.zookeeper where path = '/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/elog/blocks'")
count=$($CLICKHOUSE_CLIENT --query="SELECT COUNT(*) FROM system.zookeeper where path = '/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/elog/s1/blocks'")
while [[ $count != 2 ]]
do
sleep 1
count=$($CLICKHOUSE_CLIENT --query="SELECT COUNT(*) FROM system.zookeeper where path = '/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/elog/blocks'")
count=$($CLICKHOUSE_CLIENT --query="SELECT COUNT(*) FROM system.zookeeper where path = '/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/elog/s1/blocks'")
done
$CLICKHOUSE_CLIENT --query="INSERT INTO elog VALUES (toDate('2018-10-01'), 2, 'hello')"
$CLICKHOUSE_CLIENT --query="SELECT count(*) from elog" # 5 rows
count=$($CLICKHOUSE_CLIENT --query="SELECT COUNT(*) FROM system.zookeeper where path = '/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/elog/blocks'")
count=$($CLICKHOUSE_CLIENT --query="SELECT COUNT(*) FROM system.zookeeper where path = '/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/elog/s1/blocks'")
while [[ $count != 2 ]]
do
sleep 1
count=$($CLICKHOUSE_CLIENT --query="SELECT COUNT(*) FROM system.zookeeper where path = '/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/elog/blocks'")
count=$($CLICKHOUSE_CLIENT --query="SELECT COUNT(*) FROM system.zookeeper where path = '/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/elog/s1/blocks'")
done
$CLICKHOUSE_CLIENT --query="INSERT INTO elog VALUES (toDate('2018-10-01'), 2, 'hello')"

View File

@ -10,7 +10,8 @@ SELECT count() FROM merge_tree;
SET max_rows_to_read = 900000;
SELECT count() FROM merge_tree WHERE not ignore(); -- { serverError 158 }
SELECT count() FROM merge_tree WHERE not ignore(); -- { serverError 158 }
-- constant ignore will be pruned by part pruner. ignore(*) is used.
SELECT count() FROM merge_tree WHERE not ignore(*); -- { serverError 158 }
SELECT count() FROM merge_tree WHERE not ignore(*); -- { serverError 158 }
DROP TABLE merge_tree;

View File

@ -8,13 +8,13 @@ CREATE TABLE versioned_collapsing_table(
sign Int8,
version UInt16
)
ENGINE = ReplicatedVersionedCollapsingMergeTree('/clickhouse/versioned_collapsing_table', '1', sign, version)
ENGINE = ReplicatedVersionedCollapsingMergeTree('/clickhouse/versioned_collapsing_table/{shard}', '{replica}', sign, version)
PARTITION BY d
ORDER BY (key1, key2);
INSERT INTO versioned_collapsing_table VALUES (toDate('2019-10-10'), 1, 1, 'Hello', -1, 1);
SELECT value FROM system.zookeeper WHERE path = '/clickhouse/versioned_collapsing_table' and name = 'metadata';
SELECT value FROM system.zookeeper WHERE path = '/clickhouse/versioned_collapsing_table/s1' and name = 'metadata';
SELECT COUNT() FROM versioned_collapsing_table;

View File

@ -12,8 +12,8 @@ SCALE=5000
$CLICKHOUSE_CLIENT -n --query "
DROP TABLE IF EXISTS r1;
DROP TABLE IF EXISTS r2;
CREATE TABLE r1 (x UInt64) ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/r', '1') ORDER BY x SETTINGS old_parts_lifetime = 1, cleanup_delay_period = 0, cleanup_delay_period_random_add = 1, parts_to_throw_insert = 100000, max_replicated_logs_to_keep = 10;
CREATE TABLE r2 (x UInt64) ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/r', '2') ORDER BY x SETTINGS old_parts_lifetime = 1, cleanup_delay_period = 0, cleanup_delay_period_random_add = 1, parts_to_throw_insert = 100000, max_replicated_logs_to_keep = 10;
CREATE TABLE r1 (x UInt64) ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/{shard}', '1{replica}') ORDER BY x SETTINGS old_parts_lifetime = 1, cleanup_delay_period = 0, cleanup_delay_period_random_add = 1, parts_to_throw_insert = 100000, max_replicated_logs_to_keep = 10;
CREATE TABLE r2 (x UInt64) ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/{shard}', '2{replica}') ORDER BY x SETTINGS old_parts_lifetime = 1, cleanup_delay_period = 0, cleanup_delay_period_random_add = 1, parts_to_throw_insert = 100000, max_replicated_logs_to_keep = 10;
DETACH TABLE r2;
"
@ -29,16 +29,16 @@ for _ in {1..60}; do
done
$CLICKHOUSE_CLIENT --query "SELECT numChildren < $((SCALE / 4)) FROM system.zookeeper WHERE path = '/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/r' AND name = 'log'";
$CLICKHOUSE_CLIENT --query "SELECT numChildren < $((SCALE / 4)) FROM system.zookeeper WHERE path = '/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/s1' AND name = 'log'";
echo -e '\n---\n';
$CLICKHOUSE_CLIENT --query "SELECT value FROM system.zookeeper WHERE path = '/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/r/replicas/1' AND name = 'is_lost'";
$CLICKHOUSE_CLIENT --query "SELECT value FROM system.zookeeper WHERE path = '/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/r/replicas/2' AND name = 'is_lost'";
$CLICKHOUSE_CLIENT --query "SELECT value FROM system.zookeeper WHERE path = '/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/s1/replicas/1r1' AND name = 'is_lost'";
$CLICKHOUSE_CLIENT --query "SELECT value FROM system.zookeeper WHERE path = '/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/s1/replicas/2r1' AND name = 'is_lost'";
echo -e '\n---\n';
$CLICKHOUSE_CLIENT --query "ATTACH TABLE r2"
$CLICKHOUSE_CLIENT --receive_timeout 600 --query "SYSTEM SYNC REPLICA r2" # Need to increase timeout, otherwise it timed out in debug build
$CLICKHOUSE_CLIENT --query "SELECT value FROM system.zookeeper WHERE path = '/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/r/replicas/2' AND name = 'is_lost'";
$CLICKHOUSE_CLIENT --query "SELECT value FROM system.zookeeper WHERE path = '/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/s1/replicas/2r1' AND name = 'is_lost'";
$CLICKHOUSE_CLIENT -n --query "
DROP TABLE IF EXISTS r1;

View File

@ -2,4 +2,4 @@
10
10
24
CREATE TABLE default.replicated_mutations_empty_partitions\n(\n `key` UInt64,\n `value` UInt64\n)\nENGINE = ReplicatedMergeTree(\'/clickhouse/test/01586_replicated_mutations_empty_partitions\', \'1\')\nPARTITION BY key\nORDER BY key\nSETTINGS index_granularity = 8192
CREATE TABLE default.replicated_mutations_empty_partitions\n(\n `key` UInt64,\n `value` UInt64\n)\nENGINE = ReplicatedMergeTree(\'/clickhouse/test/default/01586_replicated_mutations_empty_partitions/{shard}\', \'{replica}\')\nPARTITION BY key\nORDER BY key\nSETTINGS index_granularity = 8192

View File

@ -5,7 +5,7 @@ CREATE TABLE replicated_mutations_empty_partitions
key UInt64,
value String
)
ENGINE = ReplicatedMergeTree('/clickhouse/test/01586_replicated_mutations_empty_partitions', '1')
ENGINE = ReplicatedMergeTree('/clickhouse/test/'||currentDatabase()||'/01586_replicated_mutations_empty_partitions/{shard}', '{replica}')
ORDER BY key
PARTITION by key;
@ -13,7 +13,7 @@ INSERT INTO replicated_mutations_empty_partitions SELECT number, toString(number
SELECT count(distinct value) FROM replicated_mutations_empty_partitions;
SELECT count() FROM system.zookeeper WHERE path = '/clickhouse/test/01586_replicated_mutations_empty_partitions/block_numbers';
SELECT count() FROM system.zookeeper WHERE path = '/clickhouse/test/'||currentDatabase()||'/01586_replicated_mutations_empty_partitions/s1/block_numbers';
ALTER TABLE replicated_mutations_empty_partitions DROP PARTITION '3';
ALTER TABLE replicated_mutations_empty_partitions DROP PARTITION '4';
@ -21,7 +21,7 @@ ALTER TABLE replicated_mutations_empty_partitions DROP PARTITION '5';
ALTER TABLE replicated_mutations_empty_partitions DROP PARTITION '9';
-- still ten records
SELECT count() FROM system.zookeeper WHERE path = '/clickhouse/test/01586_replicated_mutations_empty_partitions/block_numbers';
SELECT count() FROM system.zookeeper WHERE path = '/clickhouse/test/'||currentDatabase()||'/01586_replicated_mutations_empty_partitions/s1/block_numbers';
ALTER TABLE replicated_mutations_empty_partitions MODIFY COLUMN value UInt64 SETTINGS replication_alter_partitions_sync=2;

View File

@ -1,16 +0,0 @@
499999500000
499999500000
499999500000
499999500000
499999500000
Metadata version on replica 1 equal with first replica, OK
CREATE TABLE default.concurrent_kill_1\n(\n `key` UInt64,\n `value` Int64\n)\nENGINE = ReplicatedMergeTree(\'/clickhouse/tables/01593_concurrent_alter_mutations_kill_many_replicas_default\', \'1\')\nORDER BY key\nSETTINGS max_replicated_mutations_in_queue = 1000, number_of_free_entries_in_pool_to_execute_mutation = 0, max_replicated_merges_in_queue = 1000, index_granularity = 8192
Metadata version on replica 2 equal with first replica, OK
CREATE TABLE default.concurrent_kill_2\n(\n `key` UInt64,\n `value` Int64\n)\nENGINE = ReplicatedMergeTree(\'/clickhouse/tables/01593_concurrent_alter_mutations_kill_many_replicas_default\', \'2\')\nORDER BY key\nSETTINGS max_replicated_mutations_in_queue = 1000, number_of_free_entries_in_pool_to_execute_mutation = 0, max_replicated_merges_in_queue = 1000, index_granularity = 8192
Metadata version on replica 3 equal with first replica, OK
CREATE TABLE default.concurrent_kill_3\n(\n `key` UInt64,\n `value` Int64\n)\nENGINE = ReplicatedMergeTree(\'/clickhouse/tables/01593_concurrent_alter_mutations_kill_many_replicas_default\', \'3\')\nORDER BY key\nSETTINGS max_replicated_mutations_in_queue = 1000, number_of_free_entries_in_pool_to_execute_mutation = 0, max_replicated_merges_in_queue = 1000, index_granularity = 8192
Metadata version on replica 4 equal with first replica, OK
CREATE TABLE default.concurrent_kill_4\n(\n `key` UInt64,\n `value` Int64\n)\nENGINE = ReplicatedMergeTree(\'/clickhouse/tables/01593_concurrent_alter_mutations_kill_many_replicas_default\', \'4\')\nORDER BY key\nSETTINGS max_replicated_mutations_in_queue = 1000, number_of_free_entries_in_pool_to_execute_mutation = 0, max_replicated_merges_in_queue = 1000, index_granularity = 8192
Metadata version on replica 5 equal with first replica, OK
CREATE TABLE default.concurrent_kill_5\n(\n `key` UInt64,\n `value` Int64\n)\nENGINE = ReplicatedMergeTree(\'/clickhouse/tables/01593_concurrent_alter_mutations_kill_many_replicas_default\', \'5\')\nORDER BY key\nSETTINGS max_replicated_mutations_in_queue = 1000, number_of_free_entries_in_pool_to_execute_mutation = 0, max_replicated_merges_in_queue = 1000, index_granularity = 8192
499999500000

View File

@ -0,0 +1,16 @@
499999500000
499999500000
499999500000
499999500000
499999500000
Metadata version on replica 1 equal with first replica, OK
CREATE TABLE default.concurrent_kill_1\n(\n `key` UInt64,\n `value` Int64\n)\nENGINE = ReplicatedMergeTree(\'/clickhouse/tables/01593_concurrent_alter_mutations_kill_many_replicas_long_default/{shard}\', \'{replica}1\')\nORDER BY key\nSETTINGS max_replicated_mutations_in_queue = 1000, number_of_free_entries_in_pool_to_execute_mutation = 0, max_replicated_merges_in_queue = 1000, index_granularity = 8192
Metadata version on replica 2 equal with first replica, OK
CREATE TABLE default.concurrent_kill_2\n(\n `key` UInt64,\n `value` Int64\n)\nENGINE = ReplicatedMergeTree(\'/clickhouse/tables/01593_concurrent_alter_mutations_kill_many_replicas_long_default/{shard}\', \'{replica}2\')\nORDER BY key\nSETTINGS max_replicated_mutations_in_queue = 1000, number_of_free_entries_in_pool_to_execute_mutation = 0, max_replicated_merges_in_queue = 1000, index_granularity = 8192
Metadata version on replica 3 equal with first replica, OK
CREATE TABLE default.concurrent_kill_3\n(\n `key` UInt64,\n `value` Int64\n)\nENGINE = ReplicatedMergeTree(\'/clickhouse/tables/01593_concurrent_alter_mutations_kill_many_replicas_long_default/{shard}\', \'{replica}3\')\nORDER BY key\nSETTINGS max_replicated_mutations_in_queue = 1000, number_of_free_entries_in_pool_to_execute_mutation = 0, max_replicated_merges_in_queue = 1000, index_granularity = 8192
Metadata version on replica 4 equal with first replica, OK
CREATE TABLE default.concurrent_kill_4\n(\n `key` UInt64,\n `value` Int64\n)\nENGINE = ReplicatedMergeTree(\'/clickhouse/tables/01593_concurrent_alter_mutations_kill_many_replicas_long_default/{shard}\', \'{replica}4\')\nORDER BY key\nSETTINGS max_replicated_mutations_in_queue = 1000, number_of_free_entries_in_pool_to_execute_mutation = 0, max_replicated_merges_in_queue = 1000, index_granularity = 8192
Metadata version on replica 5 equal with first replica, OK
CREATE TABLE default.concurrent_kill_5\n(\n `key` UInt64,\n `value` Int64\n)\nENGINE = ReplicatedMergeTree(\'/clickhouse/tables/01593_concurrent_alter_mutations_kill_many_replicas_long_default/{shard}\', \'{replica}5\')\nORDER BY key\nSETTINGS max_replicated_mutations_in_queue = 1000, number_of_free_entries_in_pool_to_execute_mutation = 0, max_replicated_merges_in_queue = 1000, index_granularity = 8192
499999500000

View File

@ -11,7 +11,10 @@ for i in $(seq $REPLICAS); do
done
for i in $(seq $REPLICAS); do
$CLICKHOUSE_CLIENT --query "CREATE TABLE concurrent_kill_$i (key UInt64, value String) ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX', '$i') ORDER BY key SETTINGS max_replicated_mutations_in_queue=1000, number_of_free_entries_in_pool_to_execute_mutation=0,max_replicated_merges_in_queue=1000"
$CLICKHOUSE_CLIENT --query "CREATE TABLE concurrent_kill_$i (key UInt64, value String) ENGINE =
ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/{shard}', '{replica}$i') ORDER BY key
SETTINGS max_replicated_mutations_in_queue=1000, number_of_free_entries_in_pool_to_execute_mutation=0,max_replicated_merges_in_queue=1000"
done
$CLICKHOUSE_CLIENT --query "INSERT INTO concurrent_kill_1 SELECT number, toString(number) FROM numbers(1000000)"
@ -77,9 +80,10 @@ while true; do
done
metadata_version=$($CLICKHOUSE_CLIENT --query "SELECT value FROM system.zookeeper WHERE path = '/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/replicas/$i/' and name = 'metadata_version'")
metadata_version=$($CLICKHOUSE_CLIENT --query "SELECT value FROM system.zookeeper WHERE path = '/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/s1/replicas/r1$i/' and name = 'metadata_version'")
for i in $(seq $REPLICAS); do
replica_metadata_version=$($CLICKHOUSE_CLIENT --query "SELECT value FROM system.zookeeper WHERE path = '/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/replicas/$i/' and name = 'metadata_version'")
replica_metadata_version=$($CLICKHOUSE_CLIENT --query "SELECT value FROM system.zookeeper WHERE path = '/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/s1/replicas/r1$i/' and name = 'metadata_version'")
if [ "$metadata_version" != "$replica_metadata_version" ]; then
echo "Metadata version on replica $i differs from the first replica, FAIL"
else

View File

@ -8,7 +8,7 @@ INSERT INTO xp SELECT '2020-01-01', number, '' FROM numbers(100000);
CREATE TABLE xp_d AS xp ENGINE = Distributed(test_shard_localhost, currentDatabase(), xp);
SELECT count(7 = (SELECT number FROM numbers(0) ORDER BY number ASC NULLS FIRST LIMIT 7)) FROM xp_d PREWHERE toYYYYMM(A) GLOBAL IN (SELECT NULL = (SELECT number FROM numbers(1) ORDER BY number DESC NULLS LAST LIMIT 1), toYYYYMM(min(A)) FROM xp_d) WHERE B > NULL; -- { serverError 20 }
SELECT count(7 = (SELECT number FROM numbers(0) ORDER BY number ASC NULLS FIRST LIMIT 7)) FROM xp_d PREWHERE toYYYYMM(A) GLOBAL IN (SELECT NULL = (SELECT number FROM numbers(1) ORDER BY number DESC NULLS LAST LIMIT 1), toYYYYMM(min(A)) FROM xp_d) WHERE B > NULL; -- B > NULL is evaluated to 0 and this works
SELECT count() FROM xp_d WHERE A GLOBAL IN (SELECT NULL); -- { serverError 53 }

View File

@ -1,16 +1,14 @@
block_numbers
blocks
1
r1
========
block_numbers
blocks
1
r1
========
block_numbers
blocks
========
1
failed_parts
last_part
leader_election-0000000000
parallel

View File

@ -3,17 +3,20 @@ DROP TABLE IF EXISTS sample_table;
CREATE TABLE sample_table (
key UInt64
)
ENGINE ReplicatedMergeTree('/clickhouse/01700_system_zookeeper_path_in', '1')
ENGINE ReplicatedMergeTree('/clickhouse/01700_system_zookeeper_path_in/{shard}', '{replica}')
ORDER BY tuple();
SELECT name FROM system.zookeeper WHERE path = '/clickhouse/01700_system_zookeeper_path_in' AND name like 'block%' ORDER BY name;
SELECT name FROM system.zookeeper WHERE path = '/clickhouse/01700_system_zookeeper_path_in/replicas' ORDER BY name;
SELECT name FROM system.zookeeper WHERE path = '/clickhouse/01700_system_zookeeper_path_in/s1' AND name like 'block%' ORDER BY name;
SELECT name FROM system.zookeeper WHERE path = '/clickhouse/01700_system_zookeeper_path_in/s1/replicas' AND name LIKE '%r1%' ORDER BY name;
SELECT '========';
SELECT name FROM system.zookeeper WHERE path IN ('/clickhouse/01700_system_zookeeper_path_in') AND name LIKE 'block%' ORDER BY name;
SELECT name FROM system.zookeeper WHERE path IN ('/clickhouse/01700_system_zookeeper_path_in/replicas') ORDER BY name;
SELECT name FROM system.zookeeper WHERE path IN ('/clickhouse/01700_system_zookeeper_path_in/s1') AND name LIKE 'block%' ORDER BY name;
SELECT name FROM system.zookeeper WHERE path IN ('/clickhouse/01700_system_zookeeper_path_in/s1/replicas') AND name LIKE '%r1%' ORDER BY name;
SELECT '========';
SELECT name FROM system.zookeeper WHERE path IN ('/clickhouse/01700_system_zookeeper_path_in','/clickhouse/01700_system_zookeeper_path_in/replicas') AND name LIKE 'block%' ORDER BY name;
SELECT name FROM system.zookeeper WHERE path IN ('/clickhouse/01700_system_zookeeper_path_in/s1',
'/clickhouse/01700_system_zookeeper_path_in/s1/replicas') AND name LIKE 'block%' ORDER BY name;
SELECT '========';
SELECT name FROM system.zookeeper WHERE path IN (SELECT concat('/clickhouse/01700_system_zookeeper_path_in/', name) FROM system.zookeeper WHERE (path = '/clickhouse/01700_system_zookeeper_path_in')) ORDER BY name;
SELECT name FROM system.zookeeper WHERE path IN (SELECT concat('/clickhouse/01700_system_zookeeper_path_in/s1/', name)
FROM system.zookeeper WHERE (name != 'replicas' AND name NOT LIKE 'leader_election%' AND path = '/clickhouse/01700_system_zookeeper_path_in/s1')) ORDER BY name;
DROP TABLE IF EXISTS sample_table;

View File

@ -0,0 +1,7 @@
1 1
1 2
1 3
1 1
1 2
1 3
3

View File

@ -0,0 +1,19 @@
drop table if exists x;
create table x (i int, j int) engine MergeTree partition by i order by j settings index_granularity = 1;
insert into x values (1, 1), (1, 2), (1, 3), (2, 4), (2, 5), (2, 6);
set max_rows_to_read = 3;
select * from x where _partition_id = partitionId(1);
set max_rows_to_read = 4; -- one row for subquery
select * from x where _partition_id in (select partitionId(number + 1) from numbers(1));
-- trivial count optimization test
set max_rows_to_read = 1; -- one row for subquery
select count() from x where _partition_id in (select partitionId(number + 1) from numbers(1));
drop table x;

View File

@ -0,0 +1 @@
SELECT range(toUInt256(1), 1); -- { serverError 44 }

View File

@ -118,6 +118,17 @@
"01148_zookeeper_path_macros_unfolding",
"01294_system_distributed_on_cluster",
"01269_create_with_null",
"01451_replicated_detach_drop_and_quorum",
"01188_attach_table_from_path",
"01149_zookeeper_mutation_stuck_after_replace_partition",
/// user_files
"01721_engine_file_truncate_on_insert",
/// Fails due to additional replicas or shards
"quorum",
"01650_drop_part_and_deduplication_zookeeper",
"01532_execute_merges_on_single_replica",
"00652_replicated_mutations_default_database_zookeeper",
"00620_optimize_on_nonleader_replica_zookeeper",
/// grep -c
"01018_ddl_dictionaries_bad_queries",
"00908_bloom_filter_index",
@ -136,6 +147,7 @@
"00626_replace_partition_from_table_zookeeper",
"00626_replace_partition_from_table",
"00152_insert_different_granularity",
"00054_merge_tree_partitions",
/// Old syntax is not allowed
"01062_alter_on_mutataion_zookeeper",
"00925_zookeeper_empty_replicated_merge_tree_optimize_final",
@ -151,7 +163,8 @@
"00083_create_merge_tree_zookeeper",
"00062_replicated_merge_tree_alter_zookeeper",
/// Does not support renaming of multiple tables in single query
"00634_rename_view"
"00634_rename_view",
"00140_rename"
],
"polymorphic-parts": [
"01508_partition_pruning_long", /// bug, shoud be fixed

View File

@ -28,6 +28,10 @@ def instrument_clickhouse_server_log(self, node=None, clickhouse_server_log="/va
try:
with And("adding test name start message to the clickhouse-server.log"):
node.command(f"echo -e \"\\n-- start: {current().name} --\\n\" >> {clickhouse_server_log}")
with And("dump memory info"):
node.command(f"echo -e \"\\n-- {current().name} -- top --\\n\" && top -bn1")
node.command(f"echo -e \"\\n-- {current().name} -- df --\\n\" && df -h")
node.command(f"echo -e \"\\n-- {current().name} -- free --\\n\" && free -mh")
yield
finally:

View File

@ -58,7 +58,7 @@ int main(int argc, char *argv[])
Poco::Logger::root().setChannel(channel);
Poco::Logger::root().setLevel("trace");
}
auto * logger = &Poco::Logger::get("nukeeper-dumper");
auto * logger = &Poco::Logger::get("keeper-dumper");
ResponsesQueue queue;
SnapshotsQueue snapshots_queue{1};
CoordinationSettingsPtr settings = std::make_shared<CoordinationSettings>();