Merge branch 'master' into mvcc_prototype

This commit is contained in:
Alexander Tokmakov 2021-12-17 10:43:46 +03:00
commit 32e62ed5c2
149 changed files with 4303 additions and 1996 deletions

View File

@ -8,6 +8,10 @@
name: Docker Container Scan (clickhouse-server)
env:
# Force the stdout and stderr streams to be unbuffered
PYTHONUNBUFFERED: 1
"on":
pull_request:
paths:

View File

@ -1,4 +1,9 @@
name: CherryPick
env:
# Force the stdout and stderr streams to be unbuffered
PYTHONUNBUFFERED: 1
concurrency:
group: cherry-pick
on: # yamllint disable-line rule:truthy
@ -8,18 +13,23 @@ jobs:
CherryPick:
runs-on: [self-hosted, style-checker]
steps:
- name: Set envs
run: |
cat >> "$GITHUB_ENV" << 'EOF'
TEMP_PATH=${{runner.temp}}/cherry_pick
ROBOT_CLICKHOUSE_SSH_KEY<<RCSK
${{secrets.ROBOT_CLICKHOUSE_SSH_KEY}}
RCSK
REPO_OWNER=ClickHouse
REPO_NAME=ClickHouse
REPO_TEAM=core
EOF
- name: Check out repository code
uses: actions/checkout@v2
with:
token: ${{secrets.ROBOT_CLICKHOUSE_COMMIT_TOKEN}}
fetch-depth: 0
- name: Cherry pick
env:
TEMP_PATH: ${{runner.temp}}/cherry_pick
ROBOT_CLICKHOUSE_SSH_KEY: ${{secrets.ROBOT_CLICKHOUSE_SSH_KEY}}
REPO_OWNER: "ClickHouse"
REPO_NAME: "ClickHouse"
REPO_TEAM: "core"
run: |
sudo pip install GitPython
cd $GITHUB_WORKSPACE/tests/ci

View File

@ -1,4 +1,9 @@
name: BackportPR
env:
# Force the stdout and stderr streams to be unbuffered
PYTHONUNBUFFERED: 1
on: # yamllint disable-line rule:truthy
push:
branches:
@ -7,6 +12,9 @@ jobs:
DockerHubPush:
runs-on: [self-hosted, style-checker]
steps:
- name: Clear repository
run: |
sudo rm -fr $GITHUB_WORKSPACE && mkdir $GITHUB_WORKSPACE
- name: Check out repository code
uses: actions/checkout@v2
- name: Images check
@ -22,17 +30,23 @@ jobs:
needs: [BuilderDebRelease]
runs-on: [self-hosted, style-checker]
steps:
- name: Set envs
run: |
cat >> "$GITHUB_ENV" << 'EOF'
TEMP_PATH=${{runner.temp}}/compatibility_check
REPO_COPY=${{runner.temp}}/compatibility_check/ClickHouse
REPORTS_PATH=${{runner.temp}}/reports_dir
EOF
- name: Clear repository
run: |
sudo rm -fr $GITHUB_WORKSPACE && mkdir $GITHUB_WORKSPACE
- name: Check out repository code
uses: actions/checkout@v2
- name: Download json reports
uses: actions/download-artifact@v2
with:
path: ${{runner.temp}}/reports_dir
path: ${{ env.REPORTS_PATH }}
- name: CompatibilityCheck
env:
TEMP_PATH: ${{runner.temp}}/compatibility_check
REPO_COPY: ${{runner.temp}}/compatibility_check/ClickHouse
REPORTS_PATH: ${{runner.temp}}/reports_dir
run: |
sudo rm -fr $TEMP_PATH
mkdir -p $TEMP_PATH
@ -51,24 +65,30 @@ jobs:
needs: [DockerHubPush]
runs-on: [self-hosted, builder]
steps:
- name: Set envs
run: |
cat >> "$GITHUB_ENV" << 'EOF'
TEMP_PATH=${{runner.temp}}/build_check
IMAGES_PATH=${{runner.temp}}/images_path
REPO_COPY=${{runner.temp}}/build_check/ClickHouse
CACHES_PATH=${{runner.temp}}/../ccaches
CHECK_NAME=ClickHouse build check (actions)
BUILD_NAME=package_release
EOF
- name: Download changed images
uses: actions/download-artifact@v2
with:
name: changed_images
path: ${{ runner.temp }}/images_path
path: ${{ env.IMAGES_PATH }}
- name: Clear repository
run: |
sudo rm -fr $GITHUB_WORKSPACE && mkdir $GITHUB_WORKSPACE
- name: Check out repository code
uses: actions/checkout@v2
with:
submodules: 'recursive'
submodules: 'true'
fetch-depth: 0 # otherwise we will have no info about contributors
- name: Build
env:
TEMP_PATH: ${{runner.temp}}/build_check
IMAGES_PATH: ${{runner.temp}}/images_path
REPO_COPY: ${{runner.temp}}/build_check/ClickHouse
CACHES_PATH: ${{runner.temp}}/../ccaches
CHECK_NAME: 'ClickHouse build check (actions)'
BUILD_NAME: 'package_release'
run: |
sudo rm -fr $TEMP_PATH
mkdir -p $TEMP_PATH
@ -78,35 +98,41 @@ jobs:
uses: actions/upload-artifact@v2
with:
name: ${{ env.BUILD_NAME }}
path: ${{ runner.temp }}/build_check/${{ env.BUILD_NAME }}.json
path: ${{ env.TEMP_PATH }}/${{ env.BUILD_NAME }}.json
- name: Cleanup
if: always()
run: |
docker kill $(docker ps -q) ||:
docker rm -f $(docker ps -a -q) ||:
sudo rm -fr $TEMP_PATH
sudo rm -fr $TEMP_PATH $CACHES_PATH
BuilderDebAsan:
needs: [DockerHubPush]
runs-on: [self-hosted, builder]
steps:
- name: Set envs
run: |
cat >> "$GITHUB_ENV" << 'EOF'
TEMP_PATH=${{runner.temp}}/build_check
IMAGES_PATH=${{runner.temp}}/images_path
REPO_COPY=${{runner.temp}}/build_check/ClickHouse
CACHES_PATH=${{runner.temp}}/../ccaches
CHECK_NAME=ClickHouse build check (actions)
BUILD_NAME=package_asan
EOF
- name: Download changed images
uses: actions/download-artifact@v2
with:
name: changed_images
path: ${{ runner.temp }}/images_path
path: ${{ env.IMAGES_PATH }}
- name: Clear repository
run: |
sudo rm -fr $GITHUB_WORKSPACE && mkdir $GITHUB_WORKSPACE
- name: Check out repository code
uses: actions/checkout@v2
with:
submodules: 'recursive'
submodules: 'true'
fetch-depth: 0 # otherwise we will have no info about contributors
- name: Build
env:
TEMP_PATH: ${{runner.temp}}/build_check
IMAGES_PATH: ${{runner.temp}}/images_path
REPO_COPY: ${{runner.temp}}/build_check/ClickHouse
CACHES_PATH: ${{runner.temp}}/../ccaches
CHECK_NAME: 'ClickHouse build check (actions)'
BUILD_NAME: 'package_asan'
run: |
sudo rm -fr $TEMP_PATH
mkdir -p $TEMP_PATH
@ -116,35 +142,41 @@ jobs:
uses: actions/upload-artifact@v2
with:
name: ${{ env.BUILD_NAME }}
path: ${{ runner.temp }}/build_check/${{ env.BUILD_NAME }}.json
path: ${{ env.TEMP_PATH }}/${{ env.BUILD_NAME }}.json
- name: Cleanup
if: always()
run: |
docker kill $(docker ps -q) ||:
docker rm -f $(docker ps -a -q) ||:
sudo rm -fr $TEMP_PATH
sudo rm -fr $TEMP_PATH $CACHES_PATH
BuilderDebTsan:
needs: [DockerHubPush]
runs-on: [self-hosted, builder]
steps:
- name: Set envs
run: |
cat >> "$GITHUB_ENV" << 'EOF'
TEMP_PATH=${{runner.temp}}/build_check
IMAGES_PATH=${{runner.temp}}/images_path
REPO_COPY=${{runner.temp}}/build_check/ClickHouse
CACHES_PATH=${{runner.temp}}/../ccaches
CHECK_NAME=ClickHouse build check (actions)
BUILD_NAME=package_tsan
EOF
- name: Download changed images
uses: actions/download-artifact@v2
with:
name: changed_images
path: ${{ runner.temp }}/images_path
path: ${{ env.IMAGES_PATH }}
- name: Clear repository
run: |
sudo rm -fr $GITHUB_WORKSPACE && mkdir $GITHUB_WORKSPACE
- name: Check out repository code
uses: actions/checkout@v2
with:
submodules: 'recursive'
submodules: 'true'
fetch-depth: 0 # otherwise we will have no info about contributors
- name: Build
env:
TEMP_PATH: ${{runner.temp}}/build_check
IMAGES_PATH: ${{runner.temp}}/images_path
REPO_COPY: ${{runner.temp}}/build_check/ClickHouse
CACHES_PATH: ${{runner.temp}}/../ccaches
CHECK_NAME: 'ClickHouse build check (actions)'
BUILD_NAME: 'package_tsan'
run: |
sudo rm -fr $TEMP_PATH
mkdir -p $TEMP_PATH
@ -154,35 +186,41 @@ jobs:
uses: actions/upload-artifact@v2
with:
name: ${{ env.BUILD_NAME }}
path: ${{ runner.temp }}/build_check/${{ env.BUILD_NAME }}.json
path: ${{ env.TEMP_PATH }}/${{ env.BUILD_NAME }}.json
- name: Cleanup
if: always()
run: |
docker kill $(docker ps -q) ||:
docker rm -f $(docker ps -a -q) ||:
sudo rm -fr $TEMP_PATH
sudo rm -fr $TEMP_PATH $CACHES_PATH
BuilderDebDebug:
needs: [DockerHubPush]
runs-on: [self-hosted, builder]
steps:
- name: Set envs
run: |
cat >> "$GITHUB_ENV" << 'EOF'
TEMP_PATH=${{runner.temp}}/build_check
IMAGES_PATH=${{runner.temp}}/images_path
REPO_COPY=${{runner.temp}}/build_check/ClickHouse
CACHES_PATH=${{runner.temp}}/../ccaches
CHECK_NAME=ClickHouse build check (actions)
BUILD_NAME=package_debug
EOF
- name: Download changed images
uses: actions/download-artifact@v2
with:
name: changed_images
path: ${{ runner.temp }}/images_path
path: ${{ env.IMAGES_PATH }}
- name: Clear repository
run: |
sudo rm -fr $GITHUB_WORKSPACE && mkdir $GITHUB_WORKSPACE
- name: Check out repository code
uses: actions/checkout@v2
with:
submodules: 'recursive'
submodules: 'true'
fetch-depth: 0 # otherwise we will have no info about contributors
- name: Build
env:
TEMP_PATH: ${{runner.temp}}/build_check
IMAGES_PATH: ${{runner.temp}}/images_path
REPO_COPY: ${{runner.temp}}/build_check/ClickHouse
CACHES_PATH: ${{runner.temp}}/../ccaches
CHECK_NAME: 'ClickHouse build check (actions)'
BUILD_NAME: 'package_debug'
run: |
sudo rm -fr $TEMP_PATH
mkdir -p $TEMP_PATH
@ -192,13 +230,13 @@ jobs:
uses: actions/upload-artifact@v2
with:
name: ${{ env.BUILD_NAME }}
path: ${{ runner.temp }}/build_check/${{ env.BUILD_NAME }}.json
path: ${{ env.TEMP_PATH }}/${{ env.BUILD_NAME }}.json
- name: Cleanup
if: always()
run: |
docker kill $(docker ps -q) ||:
docker rm -f $(docker ps -a -q) ||:
sudo rm -fr $TEMP_PATH
sudo rm -fr $TEMP_PATH $CACHES_PATH
############################################################################################
##################################### BUILD REPORTER #######################################
############################################################################################
@ -210,17 +248,23 @@ jobs:
- BuilderDebDebug
runs-on: [self-hosted, style-checker]
steps:
- name: Set envs
run: |
cat >> "$GITHUB_ENV" << 'EOF'
TEMP_PATH=${{runner.temp}}/report_check
REPORTS_PATH=${{runner.temp}}/reports_dir
CHECK_NAME=ClickHouse build check (actions)
EOF
- name: Download json reports
uses: actions/download-artifact@v2
with:
path: ${{runner.temp}}/reports_dir
path: ${{ env.REPORTS_PATH }}
- name: Clear repository
run: |
sudo rm -fr $GITHUB_WORKSPACE && mkdir $GITHUB_WORKSPACE
- name: Check out repository code
uses: actions/checkout@v2
- name: Report Builder
env:
TEMP_PATH: ${{runner.temp}}/report_check
REPORTS_PATH: ${{runner.temp}}/reports_dir
CHECK_NAME: 'ClickHouse build check (actions)'
run: |
sudo rm -fr $TEMP_PATH
mkdir -p $TEMP_PATH
@ -239,19 +283,25 @@ jobs:
needs: [BuilderDebAsan]
runs-on: [self-hosted, func-tester]
steps:
- name: Set envs
run: |
cat >> "$GITHUB_ENV" << 'EOF'
TEMP_PATH=${{runner.temp}}/stateless_debug
REPORTS_PATH=${{runner.temp}}/reports_dir
CHECK_NAME=Stateless tests (address, actions)
REPO_COPY=${{runner.temp}}/stateless_debug/ClickHouse
KILL_TIMEOUT=10800
EOF
- name: Download json reports
uses: actions/download-artifact@v2
with:
path: ${{runner.temp}}/reports_dir
path: ${{ env.REPORTS_PATH }}
- name: Clear repository
run: |
sudo rm -fr $GITHUB_WORKSPACE && mkdir $GITHUB_WORKSPACE
- name: Check out repository code
uses: actions/checkout@v2
- name: Functional test
env:
TEMP_PATH: ${{runner.temp}}/stateless_debug
REPORTS_PATH: ${{runner.temp}}/reports_dir
CHECK_NAME: 'Stateless tests (address, actions)'
REPO_COPY: ${{runner.temp}}/stateless_debug/ClickHouse
KILL_TIMEOUT: 10800
run: |
sudo rm -fr $TEMP_PATH
mkdir -p $TEMP_PATH
@ -271,19 +321,25 @@ jobs:
needs: [BuilderDebDebug]
runs-on: [self-hosted, func-tester]
steps:
- name: Set envs
run: |
cat >> "$GITHUB_ENV" << 'EOF'
TEMP_PATH=${{runner.temp}}/stateful_debug
REPORTS_PATH=${{runner.temp}}/reports_dir
CHECK_NAME=Stateful tests (debug, actions)
REPO_COPY=${{runner.temp}}/stateful_debug/ClickHouse
KILL_TIMEOUT=3600
EOF
- name: Download json reports
uses: actions/download-artifact@v2
with:
path: ${{runner.temp}}/reports_dir
path: ${{ env.REPORTS_PATH }}
- name: Clear repository
run: |
sudo rm -fr $GITHUB_WORKSPACE && mkdir $GITHUB_WORKSPACE
- name: Check out repository code
uses: actions/checkout@v2
- name: Functional test
env:
TEMP_PATH: ${{runner.temp}}/stateful_debug
REPORTS_PATH: ${{runner.temp}}/reports_dir
CHECK_NAME: 'Stateful tests (debug, actions)'
REPO_COPY: ${{runner.temp}}/stateful_debug/ClickHouse
KILL_TIMEOUT: 3600
run: |
sudo rm -fr $TEMP_PATH
mkdir -p $TEMP_PATH
@ -301,20 +357,30 @@ jobs:
##############################################################################################
StressTestTsan:
needs: [BuilderDebTsan]
runs-on: [self-hosted, stress-tester]
# func testers have 16 cores + 128 GB memory
# while stress testers have 36 cores + 72 memory
# It would be better to have something like 32 + 128,
# but such servers almost unavailable as spot instances.
runs-on: [self-hosted, func-tester]
steps:
- name: Set envs
run: |
cat >> "$GITHUB_ENV" << 'EOF'
TEMP_PATH=${{runner.temp}}/stress_thread
REPORTS_PATH=${{runner.temp}}/reports_dir
CHECK_NAME=Stress test (thread, actions)
REPO_COPY=${{runner.temp}}/stress_thread/ClickHouse
EOF
- name: Download json reports
uses: actions/download-artifact@v2
with:
path: ${{runner.temp}}/reports_dir
path: ${{ env.REPORTS_PATH }}
- name: Clear repository
run: |
sudo rm -fr $GITHUB_WORKSPACE && mkdir $GITHUB_WORKSPACE
- name: Check out repository code
uses: actions/checkout@v2
- name: Stress test
env:
TEMP_PATH: ${{runner.temp}}/stress_thread
REPORTS_PATH: ${{runner.temp}}/reports_dir
CHECK_NAME: 'Stress test (thread, actions)'
REPO_COPY: ${{runner.temp}}/stress_thread/ClickHouse
run: |
sudo rm -fr $TEMP_PATH
mkdir -p $TEMP_PATH
@ -334,18 +400,24 @@ jobs:
needs: [BuilderDebRelease]
runs-on: [self-hosted, stress-tester]
steps:
- name: Set envs
run: |
cat >> "$GITHUB_ENV" << 'EOF'
TEMP_PATH=${{runner.temp}}/integration_tests_release
REPORTS_PATH=${{runner.temp}}/reports_dir
CHECK_NAME=Integration tests (release, actions)
REPO_COPY=${{runner.temp}}/integration_tests_release/ClickHouse
EOF
- name: Download json reports
uses: actions/download-artifact@v2
with:
path: ${{runner.temp}}/reports_dir
path: ${{ env.REPORTS_PATH }}
- name: Clear repository
run: |
sudo rm -fr $GITHUB_WORKSPACE && mkdir $GITHUB_WORKSPACE
- name: Check out repository code
uses: actions/checkout@v2
- name: Integration test
env:
TEMP_PATH: ${{runner.temp}}/integration_tests_release
REPORTS_PATH: ${{runner.temp}}/reports_dir
CHECK_NAME: 'Integration tests (release, actions)'
REPO_COPY: ${{runner.temp}}/integration_tests_release/ClickHouse
run: |
sudo rm -fr $TEMP_PATH
mkdir -p $TEMP_PATH
@ -369,6 +441,9 @@ jobs:
- CompatibilityCheck
runs-on: [self-hosted, style-checker]
steps:
- name: Clear repository
run: |
sudo rm -fr $GITHUB_WORKSPACE && mkdir $GITHUB_WORKSPACE
- name: Check out repository code
uses: actions/checkout@v2
- name: Finish label

View File

@ -1,4 +1,9 @@
name: Cancel
env:
# Force the stdout and stderr streams to be unbuffered
PYTHONUNBUFFERED: 1
on: # yamllint disable-line rule:truthy
workflow_run:
workflows: ["CIGithubActions", "ReleaseCI", "DocsCheck", "BackportPR"]

View File

@ -1,4 +1,9 @@
name: DocsCheck
env:
# Force the stdout and stderr streams to be unbuffered
PYTHONUNBUFFERED: 1
on: # yamllint disable-line rule:truthy
pull_request:
types:
@ -14,6 +19,9 @@ jobs:
CheckLabels:
runs-on: [self-hosted, style-checker]
steps:
- name: Clear repository
run: |
sudo rm -rf $GITHUB_WORKSPACE && mkdir $GITHUB_WORKSPACE
- name: Check out repository code
uses: actions/checkout@v2
- name: Labels check
@ -24,6 +32,9 @@ jobs:
needs: CheckLabels
runs-on: [self-hosted, style-checker]
steps:
- name: Clear repository
run: |
sudo rm -rf $GITHUB_WORKSPACE && mkdir $GITHUB_WORKSPACE
- name: Check out repository code
uses: actions/checkout@v2
- name: Images check
@ -39,17 +50,23 @@ jobs:
needs: DockerHubPush
runs-on: [self-hosted, func-tester]
steps:
- name: Set envs
run: |
cat >> "$GITHUB_ENV" << 'EOF'
TEMP_PATH=${{runner.temp}}/docs_check
REPO_COPY=${{runner.temp}}/docs_check/ClickHouse
EOF
- name: Download changed images
uses: actions/download-artifact@v2
with:
name: changed_images
path: ${{ runner.temp }}/docs_check
path: ${{ env.TEMP_PATH }}
- name: Clear repository
run: |
sudo rm -rf $GITHUB_WORKSPACE && mkdir $GITHUB_WORKSPACE
- name: Check out repository code
uses: actions/checkout@v2
- name: Docs Check
env:
TEMP_PATH: ${{runner.temp}}/docs_check
REPO_COPY: ${{runner.temp}}/docs_check/ClickHouse
run: |
cp -r $GITHUB_WORKSPACE $TEMP_PATH
cd $REPO_COPY/tests/ci

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -1,4 +1,9 @@
name: DocsReleaseChecks
env:
# Force the stdout and stderr streams to be unbuffered
PYTHONUNBUFFERED: 1
concurrency:
group: master-release
cancel-in-progress: true
@ -11,10 +16,15 @@ on: # yamllint disable-line rule:truthy
- 'website/**'
- 'benchmark/**'
- 'docker/**'
- '.github/**'
workflow_dispatch:
jobs:
DockerHubPush:
runs-on: [self-hosted, style-checker]
steps:
- name: Clear repository
run: |
sudo rm -fr $GITHUB_WORKSPACE && mkdir $GITHUB_WORKSPACE
- name: Check out repository code
uses: actions/checkout@v2
- name: Images check
@ -30,13 +40,16 @@ jobs:
needs: DockerHubPush
runs-on: [self-hosted, func-tester]
steps:
- name: Clear repository
run: |
sudo rm -fr $GITHUB_WORKSPACE && mkdir $GITHUB_WORKSPACE
- name: Check out repository code
uses: actions/checkout@v2
- name: Download changed images
uses: actions/download-artifact@v2
with:
name: changed_images
path: ${{runner.temp}}/docs_release
path: ${{ env.TEMP_PATH }}
- name: Docs Release
env:
TEMP_PATH: ${{runner.temp}}/docs_release
@ -44,6 +57,8 @@ jobs:
CLOUDFLARE_TOKEN: ${{secrets.CLOUDFLARE}}
ROBOT_CLICKHOUSE_SSH_KEY: ${{secrets.ROBOT_CLICKHOUSE_SSH_KEY}}
run: |
sudo rm -fr $TEMP_PATH
mkdir -p $TEMP_PATH
cp -r $GITHUB_WORKSPACE $TEMP_PATH
cd $REPO_COPY/tests/ci
python3 docs_release.py

File diff suppressed because it is too large Load Diff

View File

@ -71,8 +71,8 @@
* Fix the issue that `LowCardinality` of `Int256` cannot be created. [#31832](https://github.com/ClickHouse/ClickHouse/pull/31832) ([alexey-milovidov](https://github.com/alexey-milovidov)).
* Recreate `system.*_log` tables in case of different engine/partition_by. [#31824](https://github.com/ClickHouse/ClickHouse/pull/31824) ([Azat Khuzhin](https://github.com/azat)).
* `MaterializedMySQL`: Fix issue with table named 'table'. [#31781](https://github.com/ClickHouse/ClickHouse/pull/31781) ([Håvard Kvålen](https://github.com/havardk)).
* ClickHouse dictionary source: support named collections. Closes [#31705](https://github.com/ClickHouse/ClickHouse/issues/31705). [#31749](https://github.com/ClickHouse/ClickHouse/pull/31749) ([Kseniia Sumarokova](https://github.com/kssenii)).
* Allow to use named collections configuration for Kafka and RabbitMQ engines (the same way as for other integration table engines). [#31691](https://github.com/ClickHouse/ClickHouse/pull/31691) ([Kseniia Sumarokova](https://github.com/kssenii)).
* ClickHouse dictionary source: support predefined connections. Closes [#31705](https://github.com/ClickHouse/ClickHouse/issues/31705). [#31749](https://github.com/ClickHouse/ClickHouse/pull/31749) ([Kseniia Sumarokova](https://github.com/kssenii)).
* Allow to use predefined connections configuration for Kafka and RabbitMQ engines (the same way as for other integration table engines). [#31691](https://github.com/ClickHouse/ClickHouse/pull/31691) ([Kseniia Sumarokova](https://github.com/kssenii)).
* Always re-render prompt while navigating history in clickhouse-client. This will improve usability of manipulating very long queries that don't fit on screen. [#31675](https://github.com/ClickHouse/ClickHouse/pull/31675) ([alexey-milovidov](https://github.com/alexey-milovidov)) (author: Amos Bird).
* Add key bindings for navigating through history (instead of lines/history). [#31641](https://github.com/ClickHouse/ClickHouse/pull/31641) ([Azat Khuzhin](https://github.com/azat)).
* Improve the `max_execution_time` checks. Fixed some cases when timeout checks do not happen and query could run too long. [#31636](https://github.com/ClickHouse/ClickHouse/pull/31636) ([Raúl Marín](https://github.com/Algunenano)).

View File

@ -82,7 +82,9 @@ PoolWithFailover::PoolWithFailover(
unsigned default_connections_,
unsigned max_connections_,
size_t max_tries_,
uint64_t wait_timeout_)
uint64_t wait_timeout_,
size_t connect_timeout_,
size_t rw_timeout_)
: max_tries(max_tries_)
, shareable(false)
, wait_timeout(wait_timeout_)
@ -93,8 +95,8 @@ PoolWithFailover::PoolWithFailover(
replicas_by_priority[0].emplace_back(std::make_shared<Pool>(database,
host, user, password, port,
/* socket_ = */ "",
MYSQLXX_DEFAULT_TIMEOUT,
MYSQLXX_DEFAULT_RW_TIMEOUT,
connect_timeout_,
rw_timeout_,
default_connections_,
max_connections_));
}

View File

@ -6,6 +6,7 @@
#define MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_START_CONNECTIONS 1
#define MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_CONNECTIONS 16
#define MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES 3
#define MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_CONNECTION_WAIT_TIMEOUT 5 /// in seconds
namespace mysqlxx
@ -121,7 +122,9 @@ namespace mysqlxx
unsigned default_connections_ = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_START_CONNECTIONS,
unsigned max_connections_ = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_CONNECTIONS,
size_t max_tries_ = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES,
uint64_t wait_timeout_ = UINT64_MAX);
uint64_t wait_timeout_ = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_CONNECTION_WAIT_TIMEOUT,
size_t connect_timeout = MYSQLXX_DEFAULT_TIMEOUT,
size_t rw_timeout = MYSQLXX_DEFAULT_RW_TIMEOUT);
PoolWithFailover(const PoolWithFailover & other);

View File

@ -1,6 +1,6 @@
option(USE_INTERNAL_AZURE_BLOB_STORAGE_LIBRARY
"Set to FALSE to use system Azure SDK instead of bundled (OFF currently not implemented)"
ON)
${ENABLE_LIBRARIES})
if (USE_INTERNAL_AZURE_BLOB_STORAGE_LIBRARY)
set(USE_AZURE_BLOB_STORAGE 1)

2
contrib/NuRaft vendored

@ -1 +1 @@
Subproject commit d10351f312c1ae1ca3fdda433693dfbef3acfece
Subproject commit bb69d48e0ee35c87a0f19e509a09a914f71f0cff

View File

@ -268,7 +268,7 @@ XMLPUBFUN void XMLCALL xmlCheckVersion(int version);
*
* Whether iconv support is available
*/
#if 1
#if 0
#define LIBXML_ICONV_ENABLED
#endif

14
debian/rules vendored
View File

@ -45,6 +45,10 @@ ifdef DEB_CXX
ifeq ($(DEB_BUILD_GNU_TYPE),$(DEB_HOST_GNU_TYPE))
CC := $(DEB_CC)
CXX := $(DEB_CXX)
else ifeq (clang,$(findstring clang,$(DEB_CXX)))
# If we crosscompile with clang, it knows what to do
CC := $(DEB_CC)
CXX := $(DEB_CXX)
else
CC := $(DEB_HOST_GNU_TYPE)-$(DEB_CC)
CXX := $(DEB_HOST_GNU_TYPE)-$(DEB_CXX)
@ -77,10 +81,6 @@ else
THREADS_COUNT = 1
endif
ifneq ($(THREADS_COUNT),)
THREADS_COUNT:=-j$(THREADS_COUNT)
endif
%:
dh $@ $(DH_FLAGS) --buildsystem=cmake
@ -89,11 +89,11 @@ override_dh_auto_configure:
override_dh_auto_build:
# Fix for ninja. Do not add -O.
$(MAKE) $(THREADS_COUNT) -C $(BUILDDIR) $(MAKE_TARGET)
$(MAKE) -j$(THREADS_COUNT) -C $(BUILDDIR) $(MAKE_TARGET)
override_dh_auto_test:
ifeq (,$(filter nocheck,$(DEB_BUILD_OPTIONS)))
cd $(BUILDDIR) && ctest $(THREADS_COUNT) -V
cd $(BUILDDIR) && ctest -j$(THREADS_COUNT) -V
endif
override_dh_clean:
@ -120,7 +120,7 @@ override_dh_install:
dh_install --list-missing --sourcedir=$(DESTDIR)
override_dh_auto_install:
env DESTDIR=$(DESTDIR) $(MAKE) $(THREADS_COUNT) -C $(BUILDDIR) install
env DESTDIR=$(DESTDIR) $(MAKE) -j$(THREADS_COUNT) -C $(BUILDDIR) install
override_dh_shlibdeps:
true # We depend only on libc and dh_shlibdeps gives us wrong (too strict) dependency.

View File

@ -24,40 +24,34 @@ RUN apt-get update \
&& apt-key add /tmp/llvm-snapshot.gpg.key \
&& export CODENAME="$(lsb_release --codename --short | tr 'A-Z' 'a-z')" \
&& echo "deb [trusted=yes] https://apt.llvm.org/${CODENAME}/ llvm-toolchain-${CODENAME}-${LLVM_VERSION} main" >> \
/etc/apt/sources.list
/etc/apt/sources.list \
&& apt-get clean
# initial packages
RUN apt-get update \
&& apt-get install \
bash \
fakeroot \
ccache \
curl \
software-properties-common \
--yes --no-install-recommends
RUN apt-get update \
&& apt-get install \
bash \
build-essential \
ccache \
clang-${LLVM_VERSION} \
clang-tidy-${LLVM_VERSION} \
cmake \
curl \
fakeroot \
gdb \
git \
gperf \
clang-${LLVM_VERSION} \
clang-tidy-${LLVM_VERSION} \
lld-${LLVM_VERSION} \
llvm-${LLVM_VERSION} \
llvm-${LLVM_VERSION}-dev \
libicu-dev \
moreutils \
ninja-build \
pigz \
rename \
software-properties-common \
tzdata \
--yes --no-install-recommends
--yes --no-install-recommends \
&& apt-get clean
# This symlink required by gcc to find lld compiler
RUN ln -s /usr/bin/lld-${LLVM_VERSION} /usr/bin/ld.lld
@ -66,7 +60,7 @@ ENV CC=clang-${LLVM_VERSION}
ENV CXX=clang++-${LLVM_VERSION}
# libtapi is required to support .tbh format from recent MacOS SDKs
RUN git clone https://github.com/tpoechtrager/apple-libtapi.git \
RUN git clone --depth 1 https://github.com/tpoechtrager/apple-libtapi.git \
&& cd apple-libtapi \
&& INSTALLPREFIX=/cctools ./build.sh \
&& ./install.sh \
@ -74,7 +68,7 @@ RUN git clone https://github.com/tpoechtrager/apple-libtapi.git \
&& rm -rf apple-libtapi
# Build and install tools for cross-linking to Darwin (x86-64)
RUN git clone https://github.com/tpoechtrager/cctools-port.git \
RUN git clone --depth 1 https://github.com/tpoechtrager/cctools-port.git \
&& cd cctools-port/cctools \
&& ./configure --prefix=/cctools --with-libtapi=/cctools \
--target=x86_64-apple-darwin \
@ -83,7 +77,7 @@ RUN git clone https://github.com/tpoechtrager/cctools-port.git \
&& rm -rf cctools-port
# Build and install tools for cross-linking to Darwin (aarch64)
RUN git clone https://github.com/tpoechtrager/cctools-port.git \
RUN git clone --depth 1 https://github.com/tpoechtrager/cctools-port.git \
&& cd cctools-port/cctools \
&& ./configure --prefix=/cctools --with-libtapi=/cctools \
--target=aarch64-apple-darwin \
@ -97,7 +91,8 @@ RUN wget -nv https://github.com/phracker/MacOSX-SDKs/releases/download/11.3/MacO
# NOTE: Seems like gcc-11 is too new for ubuntu20 repository
RUN add-apt-repository ppa:ubuntu-toolchain-r/test --yes \
&& apt-get update \
&& apt-get install gcc-11 g++-11 --yes
&& apt-get install gcc-11 g++-11 --yes \
&& apt-get clean
COPY build.sh /

View File

@ -64,8 +64,14 @@ RUN add-apt-repository ppa:ubuntu-toolchain-r/test --yes \
&& apt-get install gcc-11 g++-11 --yes
# This symlink required by gcc to find lld compiler
RUN ln -s /usr/bin/lld-${LLVM_VERSION} /usr/bin/ld.lld
# These symlinks are required:
# /usr/bin/ld.lld: by gcc to find lld compiler
# /usr/bin/aarch64-linux-gnu-obj*: for debug symbols stripping
RUN ln -s /usr/bin/lld-${LLVM_VERSION} /usr/bin/ld.lld \
&& ln -sf /usr/lib/llvm-${LLVM_VERSION}/bin/llvm-objcopy /usr/bin/aarch64-linux-gnu-strip \
&& ln -sf /usr/lib/llvm-${LLVM_VERSION}/bin/llvm-objcopy /usr/bin/aarch64-linux-gnu-objcopy \
&& ln -sf /usr/lib/llvm-${LLVM_VERSION}/bin/llvm-objdump /usr/bin/aarch64-linux-gnu-objdump
COPY build.sh /

View File

@ -29,7 +29,13 @@ def pull_image(image_name):
return False
def build_image(image_name, filepath):
subprocess.check_call("docker build --network=host -t {} -f {} .".format(image_name, filepath), shell=True)
context = os.path.dirname(filepath)
subprocess.check_call(
"docker build --network=host -t {} -f {} {}".format(
image_name, filepath, context
),
shell=True,
)
def run_docker_image_with_env(image_name, output, env_variables, ch_root, ccache_dir, docker_image_version):
env_part = " -e ".join(env_variables)
@ -90,6 +96,7 @@ def parse_env_variables(build_type, compiler, sanitizer, package_type, image_typ
elif is_cross_arm:
cc = compiler[:-len(ARM_SUFFIX)]
cmake_flags.append("-DCMAKE_TOOLCHAIN_FILE=/build/cmake/linux/toolchain-aarch64.cmake")
result.append("DEB_ARCH_FLAG=-aarm64")
elif is_cross_freebsd:
cc = compiler[:-len(FREEBSD_SUFFIX)]
cmake_flags.append("-DCMAKE_TOOLCHAIN_FILE=/build/cmake/freebsd/toolchain-x86_64.cmake")
@ -98,6 +105,7 @@ def parse_env_variables(build_type, compiler, sanitizer, package_type, image_typ
cmake_flags.append("-DCMAKE_TOOLCHAIN_FILE=/build/cmake/linux/toolchain-ppc64le.cmake")
else:
cc = compiler
result.append("DEB_ARCH_FLAG=-aamd64")
cxx = cc.replace('gcc', 'g++').replace('clang', 'clang++')

View File

@ -55,7 +55,7 @@ function find_reference_sha
)
for path in "${urls_to_try[@]}"
do
if curl --fail --head "$path"
if curl --fail --retry 5 --retry-delay 1 --retry-max-time 15 --head "$path"
then
found="$path"
break
@ -76,7 +76,7 @@ chmod 777 workspace output
cd workspace
# Download the package for the version we are going to test.
if curl --fail --head "$S3_URL/$PR_TO_TEST/$SHA_TO_TEST$COMMON_BUILD_PREFIX/performance/performance.tgz"
if curl --fail --retry 5 --retry-delay 1 --retry-max-time 15 --head "$S3_URL/$PR_TO_TEST/$SHA_TO_TEST$COMMON_BUILD_PREFIX/performance/performance.tgz"
then
right_path="$S3_URL/$PR_TO_TEST/$SHA_TO_TEST$COMMON_BUILD_PREFIX/performance/performance.tgz"
fi

View File

@ -5,8 +5,7 @@ toc_title: HDFS
# HDFS {#table_engines-hdfs}
This engine provides integration with [Apache Hadoop](https://en.wikipedia.org/wiki/Apache_Hadoop) ecosystem by allowing to manage data on [HDFS](https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/HdfsDesign.html) via ClickHouse. This engine is similar
to the [File](../../../engines/table-engines/special/file.md#table_engines-file) and [URL](../../../engines/table-engines/special/url.md#table_engines-url) engines, but provides Hadoop-specific features.
This engine provides integration with the [Apache Hadoop](https://en.wikipedia.org/wiki/Apache_Hadoop) ecosystem by allowing to manage data on [HDFS](https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/HdfsDesign.html) via ClickHouse. This engine is similar to the [File](../../../engines/table-engines/special/file.md#table_engines-file) and [URL](../../../engines/table-engines/special/url.md#table_engines-url) engines, but provides Hadoop-specific features.
## Usage {#usage}
@ -14,12 +13,13 @@ to the [File](../../../engines/table-engines/special/file.md#table_engines-file)
ENGINE = HDFS(URI, format)
```
The `URI` parameter is the whole file URI in HDFS.
The `format` parameter specifies one of the available file formats. To perform
**Engine Parameters**
- `URI` - whole file URI in HDFS. The path part of `URI` may contain globs. In this case the table would be readonly.
- `format` - specifies one of the available file formats. To perform
`SELECT` queries, the format must be supported for input, and to perform
`INSERT` queries for output. The available formats are listed in the
[Formats](../../../interfaces/formats.md#formats) section.
The path part of `URI` may contain globs. In this case the table would be readonly.
**Example:**
@ -71,12 +71,12 @@ Constructions with `{}` are similar to the [remote](../../../sql-reference/table
1. Suppose we have several files in TSV format with the following URIs on HDFS:
- 'hdfs://hdfs1:9000/some_dir/some_file_1'
- 'hdfs://hdfs1:9000/some_dir/some_file_2'
- 'hdfs://hdfs1:9000/some_dir/some_file_3'
- 'hdfs://hdfs1:9000/another_dir/some_file_1'
- 'hdfs://hdfs1:9000/another_dir/some_file_2'
- 'hdfs://hdfs1:9000/another_dir/some_file_3'
- 'hdfs://hdfs1:9000/some_dir/some_file_1'
- 'hdfs://hdfs1:9000/some_dir/some_file_2'
- 'hdfs://hdfs1:9000/some_dir/some_file_3'
- 'hdfs://hdfs1:9000/another_dir/some_file_1'
- 'hdfs://hdfs1:9000/another_dir/some_file_2'
- 'hdfs://hdfs1:9000/another_dir/some_file_3'
1. There are several ways to make a table consisting of all six files:
@ -132,6 +132,7 @@ Similar to GraphiteMergeTree, the HDFS engine supports extended configuration us
| **parameter** | **default value** |
| - | - |
| rpc\_client\_connect\_tcpnodelay | true |
| dfs\_client\_read\_shortcircuit | true |
| output\_replace-datanode-on-failure | true |
@ -181,25 +182,26 @@ Similar to GraphiteMergeTree, the HDFS engine supports extended configuration us
#### ClickHouse extras {#clickhouse-extras}
| **parameter** | **default value** |
| - | - |
|hadoop\_kerberos\_keytab | "" |
|hadoop\_kerberos\_principal | "" |
|hadoop\_kerberos\_kinit\_command | kinit |
|libhdfs3\_conf | "" |
### Limitations {#limitations}
* hadoop\_security\_kerberos\_ticket\_cache\_path and libhdfs3\_conf can be global only, not user specific
* `hadoop_security_kerberos_ticket_cache_path` and `libhdfs3_conf` can be global only, not user specific
## Kerberos support {#kerberos-support}
If hadoop\_security\_authentication parameter has value 'kerberos', ClickHouse authentifies via Kerberos facility.
Parameters [here](#clickhouse-extras) and hadoop\_security\_kerberos\_ticket\_cache\_path may be of help.
If the `hadoop_security_authentication` parameter has the value `kerberos`, ClickHouse authenticates via Kerberos.
Parameters are [here](#clickhouse-extras) and `hadoop_security_kerberos_ticket_cache_path` may be of help.
Note that due to libhdfs3 limitations only old-fashioned approach is supported,
datanode communications are not secured by SASL (HADOOP\_SECURE\_DN\_USER is a reliable indicator of such
security approach). Use tests/integration/test\_storage\_kerberized\_hdfs/hdfs_configs/bootstrap.sh for reference.
datanode communications are not secured by SASL (`HADOOP_SECURE_DN_USER` is a reliable indicator of such
security approach). Use `tests/integration/test_storage_kerberized_hdfs/hdfs_configs/bootstrap.sh` for reference.
If hadoop\_kerberos\_keytab, hadoop\_kerberos\_principal or hadoop\_kerberos\_kinit\_command is specified, kinit will be invoked. hadoop\_kerberos\_keytab and hadoop\_kerberos\_principal are mandatory in this case. kinit tool and krb5 configuration files are required.
If `hadoop_kerberos_keytab`, `hadoop_kerberos_principal` or `hadoop_kerberos_kinit_command` is specified, `kinit` will be invoked. `hadoop_kerberos_keytab` and `hadoop_kerberos_principal` are mandatory in this case. `kinit` tool and krb5 configuration files are required.
## HDFS Namenode HA support{#namenode-ha}
## HDFS Namenode HA support {#namenode-ha}
libhdfs3 support HDFS namenode HA.

View File

@ -66,9 +66,9 @@ WHERE table = 'visits'
└───────────┴────────────────┴────────┘
```
The `partition` column contains the names of the partitions. There are two partitions in this example: `201901` and `201902`. You can use this column value to specify the partition name in [ALTER … PARTITION](#alter_manipulations-with-partitions) queries.
The `partition` column contains the names of the partitions. There are two partitions in this example: `201901` and `201902`. You can use this column value to specify the partition name in [ALTER … PARTITION](../../../sql-reference/statements/alter/partition.md) queries.
The `name` column contains the names of the partition data parts. You can use this column to specify the name of the part in the [ALTER ATTACH PART](#alter_attach-partition) query.
The `name` column contains the names of the partition data parts. You can use this column to specify the name of the part in the [ALTER ATTACH PART](../../../sql-reference/statements/alter/partition.md#alter_attach-partition) query.
Lets break down the name of the first part: `201901_1_3_1`:

View File

@ -39,10 +39,10 @@ CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster] AS [db2.]name2
- `policy_name` - (optionally) policy name, it will be used to store temporary files for async send
See also:
**See Also**
- [insert_distributed_sync](../../../operations/settings/settings.md#insert_distributed_sync) setting
- [MergeTree](../../../engines/table-engines/mergetree-family/mergetree.md#table_engine-mergetree-multiple-volumes) for the examples
- [insert_distributed_sync](../../../operations/settings/settings.md#insert_distributed_sync) setting
- [MergeTree](../../../engines/table-engines/mergetree-family/mergetree.md#table_engine-mergetree-multiple-volumes) for the examples
**Distributed Settings**

View File

@ -204,7 +204,7 @@ When parsing with this format, tabs or linefeeds are not allowed in each field.
This format is also available under the name `TSVRawWithNames`.
## TabSeparatedWithNamesAndTypes {#tabseparatedrawwithnamesandtypes}
## TabSeparatedRawWithNamesAndTypes {#tabseparatedrawwithnamesandtypes}
Differs from `TabSeparatedWithNamesAndTypes` format in that the rows are written without escaping.
When parsing with this format, tabs or linefeeds are not allowed in each field.

View File

@ -1687,18 +1687,17 @@ Quorum writes
`INSERT` succeeds only when ClickHouse manages to correctly write data to the `insert_quorum` of replicas during the `insert_quorum_timeout`. If for any reason the number of replicas with successful writes does not reach the `insert_quorum`, the write is considered failed and ClickHouse will delete the inserted block from all the replicas where data has already been written.
All the replicas in the quorum are consistent, i.e., they contain data from all previous `INSERT` queries. The `INSERT` sequence is linearized.
When `insert_quorum_parallel` is disabled, all replicas in the quorum are consistent, i.e. they contain data from all previous `INSERT` queries (the `INSERT` sequence is linearized). When reading data written using `insert_quorum` and `insert_quorum_parallel` is disabled, you can turn on sequential consistency for `SELECT` queries using [select_sequential_consistency](#settings-select_sequential_consistency).
When reading the data written from the `insert_quorum`, you can use the [select_sequential_consistency](#settings-select_sequential_consistency) option.
ClickHouse generates an exception
ClickHouse generates an exception:
- If the number of available replicas at the time of the query is less than the `insert_quorum`.
- At an attempt to write data when the previous block has not yet been inserted in the `insert_quorum` of replicas. This situation may occur if the user tries to perform an `INSERT` before the previous one with the `insert_quorum` is completed.
- When `insert_quorum_parallel` is disabled and an attempt to write data is made when the previous block has not yet been inserted in `insert_quorum` of replicas. This situation may occur if the user tries to perform another `INSERT` query to the same table before the previous one with `insert_quorum` is completed.
See also:
- [insert_quorum_timeout](#settings-insert_quorum_timeout)
- [insert_quorum_parallel](#settings-insert_quorum_parallel)
- [select_sequential_consistency](#settings-select_sequential_consistency)
## insert_quorum_timeout {#settings-insert_quorum_timeout}
@ -1710,11 +1709,29 @@ Default value: 600 000 milliseconds (ten minutes).
See also:
- [insert_quorum](#settings-insert_quorum)
- [insert_quorum_parallel](#settings-insert_quorum_parallel)
- [select_sequential_consistency](#settings-select_sequential_consistency)
## insert_quorum_parallel {#settings-insert_quorum_parallel}
Enables or disables parallelism for quorum `INSERT` queries. If enabled, additional `INSERT` queries can be sent while previous queries have not yet finished. If disabled, additional writes to the same table will be rejected.
Possible values:
- 0 — Disabled.
- 1 — Enabled.
Default value: 1.
See also:
- [insert_quorum](#settings-insert_quorum)
- [insert_quorum_timeout](#settings-insert_quorum_timeout)
- [select_sequential_consistency](#settings-select_sequential_consistency)
## select_sequential_consistency {#settings-select_sequential_consistency}
Enables or disables sequential consistency for `SELECT` queries:
Enables or disables sequential consistency for `SELECT` queries. Requires `insert_quorum_parallel` to be disabled (enabled by default).
Possible values:
@ -1727,10 +1744,13 @@ Usage
When sequential consistency is enabled, ClickHouse allows the client to execute the `SELECT` query only for those replicas that contain data from all previous `INSERT` queries executed with `insert_quorum`. If the client refers to a partial replica, ClickHouse will generate an exception. The SELECT query will not include data that has not yet been written to the quorum of replicas.
When `insert_quorum_parallel` is enabled (the default), then `select_sequential_consistency` does not work. This is because parallel `INSERT` queries can be written to different sets of quorum replicas so there is no guarantee a single replica will have received all writes.
See also:
- [insert_quorum](#settings-insert_quorum)
- [insert_quorum_timeout](#settings-insert_quorum_timeout)
- [insert_quorum_parallel](#settings-insert_quorum_parallel)
## insert_deduplicate {#settings-insert-deduplicate}

View File

@ -22,7 +22,7 @@ CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
) ENGINE = engine
```
Creates a table named `name` in the `db` database or the current database if `db` is not set, with the structure specified in brackets and the `engine` engine.
Creates a table named `table_name` in the `db` database or the current database if `db` is not set, with the structure specified in brackets and the `engine` engine.
The structure of the table is a list of column descriptions, secondary indexes and constraints . If [primary key](#primary-key) is supported by the engine, it will be indicated as parameter for the table engine.
A column description is `name type` in the simplest case. Example: `RegionID UInt32`.

View File

@ -298,13 +298,16 @@ Note that elements emitted by a late firing should be treated as updated results
### Monitoring New Windows {#window-view-monitoring}
Window view supports the `WATCH` query to constantly append the processing results to the console or use `TO` syntax to output the results to a table.
Window view supports the [WATCH](../../../sql-reference/statements/watch.md) query to monitoring changes, or use `TO` syntax to output the results to a table.
``` sql
WATCH [db.]name [LIMIT n]
WATCH [db.]window_view
[EVENTS]
[LIMIT n]
[FORMAT format]
```
`WATCH` query acts similar as in `LIVE VIEW`. A `LIMIT` can be specified to set the number of updates to receive before terminating the query.
`WATCH` query acts similar as in `LIVE VIEW`. A `LIMIT` can be specified to set the number of updates to receive before terminating the query. The `EVENTS` clause can be used to obtain a short form of the `WATCH` query where instead of the query result you will just get the latest query watermark.
### Settings {#window-view-settings}

View File

@ -206,6 +206,9 @@ This extra row is only produced in `JSON*`, `TabSeparated*`, and `Pretty*` forma
- In `Pretty*` formats, the row is output as a separate table after the main result.
- In the other formats it is not available.
!!! note "Note"
totals is output in the results of `SELECT` queries, and is not output in `INSERT INTO ... SELECT`.
`WITH TOTALS` can be run in different ways when [HAVING](../../../sql-reference/statements/select/having.md) is present. The behavior depends on the `totals_mode` setting.
### Configuring Totals Processing {#configuring-totals-processing}

View File

@ -203,6 +203,9 @@ SELECT year, month, day, count(*) FROM t GROUP BY year, month, day WITH CUBE;
- В `Pretty*` форматах, строка выводится в виде отдельной таблицы после основного результата.
- В других форматах она не доступна.
!!! note "Примечание"
totals выводится только в результатах запросов `SELECT`, и не вывоводится в `INSERT INTO ... SELECT`.
При использовании секции [HAVING](having.md) поведение `WITH TOTALS` контролируется настройкой `totals_mode`.
### Настройка обработки итогов {#configuring-totals-processing}

View File

@ -727,7 +727,6 @@ void LocalServer::printHelpMessage([[maybe_unused]] const OptionsDescription & o
void LocalServer::addOptions(OptionsDescription & options_description)
{
options_description.main_description->add_options()
("database,d", po::value<std::string>(), "database")
("table,N", po::value<std::string>(), "name of the initial table")
/// If structure argument is omitted then initial query is not generated

View File

@ -293,6 +293,10 @@
<max_thread_pool_size>10000</max_thread_pool_size>
<!-- Number of workers to recycle connections in background (see also drain_timeout).
If the pool is full, connection will be drained synchronously. -->
<!-- <max_threads_for_connection_collector>10</max_threads_for_connection_collector> -->
<!-- On memory constrained environments you may have to set this to value larger than 1.
-->
<max_server_memory_usage_to_ram_ratio>0.9</max_server_memory_usage_to_ram_ratio>

View File

@ -87,7 +87,7 @@ if [ -z "$NO_BUILD" ] ; then
# Build (only binary packages).
debuild --preserve-env -e PATH \
-e DEB_CC=$DEB_CC -e DEB_CXX=$DEB_CXX -e CMAKE_FLAGS="$CMAKE_FLAGS" \
-b ${DEBUILD_NOSIGN_OPTIONS} ${DEBUILD_NODEPS_OPTIONS}
-b ${DEBUILD_NOSIGN_OPTIONS} ${DEBUILD_NODEPS_OPTIONS} ${DEB_ARCH_FLAG}
fi
if [ -n "$MAKE_RPM" ]; then

View File

@ -25,7 +25,12 @@ struct PocoSocketWrapper : public Poco::Net::SocketImpl
void IConnections::DrainCallback::operator()(int fd, Poco::Timespan, const std::string fd_description) const
{
if (!PocoSocketWrapper(fd).poll(drain_timeout, Poco::Net::Socket::SELECT_READ))
throw Exception(ErrorCodes::SOCKET_TIMEOUT, "Read timeout while draining from {}", fd_description);
{
throw Exception(ErrorCodes::SOCKET_TIMEOUT,
"Read timeout ({} ms) while draining from {}",
drain_timeout.totalMilliseconds(),
fd_description);
}
}
}

View File

@ -395,17 +395,17 @@ MultiplexedConnections::ReplicaState & MultiplexedConnections::getReplicaForRead
read_list.push_back(*connection->socket);
}
auto timeout = is_draining ? drain_timeout : receive_timeout;
int n = Poco::Net::Socket::select(
read_list,
write_list,
except_list,
is_draining ? drain_timeout : receive_timeout);
timeout);
/// We treat any error as timeout for simplicity.
/// And we also check if read_list is still empty just in case.
if (n <= 0 || read_list.empty())
{
auto err_msg = fmt::format("Timeout exceeded while reading from {}", dumpAddressesUnlocked());
for (ReplicaState & state : replica_states)
{
Connection * connection = state.connection;
@ -415,7 +415,10 @@ MultiplexedConnections::ReplicaState & MultiplexedConnections::getReplicaForRead
invalidateReplica(state);
}
}
throw Exception(err_msg, ErrorCodes::TIMEOUT_EXCEEDED);
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED,
"Timeout ({} ms) exceeded while reading from {}",
timeout.totalMilliseconds(),
dumpAddressesUnlocked());
}
}

View File

@ -0,0 +1,17 @@
#include <Common/getRandomASCIIString.h>
#include <Common/thread_local_rng.h>
#include <random>
namespace DB
{
String getRandomASCIIString(size_t len, char first, char last)
{
std::uniform_int_distribution<int> distribution(first, last);
String res(len, ' ');
for (auto & c : res)
c = distribution(thread_local_rng);
return res;
}
}

View File

@ -0,0 +1,10 @@
#pragma once
#include <Core/Types.h>
namespace DB
{
/// Slow random string. Useful for random names and things like this. Not for
/// generating data.
String getRandomASCIIString(size_t len = 32, char first = 'a', char last = 'z');
}

View File

@ -55,7 +55,7 @@ class IColumn;
M(Milliseconds, connect_timeout_with_failover_secure_ms, 100, "Connection timeout for selecting first healthy replica (for secure connections).", 0) \
M(Seconds, receive_timeout, DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC, "", 0) \
M(Seconds, send_timeout, DBMS_DEFAULT_SEND_TIMEOUT_SEC, "", 0) \
M(Seconds, drain_timeout, 3, "", 0) \
M(Seconds, drain_timeout, 3, "Timeout for draining remote connections, -1 means synchronous drain w/o ignoring errors", 0) \
M(Seconds, tcp_keep_alive_timeout, 290 /* less than DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC */, "The time in seconds the connection needs to remain idle before TCP starts sending keepalive probes", 0) \
M(Milliseconds, hedged_connection_timeout_ms, 100, "Connection timeout for establishing connection with replica for Hedged requests", 0) \
M(Milliseconds, receive_data_timeout_ms, 2000, "Connection timeout for receiving first packet of data or packet with positive progress from replica", 0) \
@ -496,8 +496,12 @@ class IColumn;
M(Bool, database_replicated_always_detach_permanently, false, "Execute DETACH TABLE as DETACH TABLE PERMANENTLY if database engine is Replicated", 0) \
M(DistributedDDLOutputMode, distributed_ddl_output_mode, DistributedDDLOutputMode::THROW, "Format of distributed DDL query result", 0) \
M(UInt64, distributed_ddl_entry_format_version, 1, "Version of DDL entry to write into ZooKeeper", 0) \
\
M(UInt64, external_storage_max_read_rows, 0, "Limit maximum number of rows when table with external engine should flush history data. Now supported only for MySQL table engine, database engine, dictionary and MaterializedMySQL. If equal to 0, this setting is disabled", 0) \
M(UInt64, external_storage_max_read_bytes, 0, "Limit maximum number of bytes when table with external engine should flush history data. Now supported only for MySQL table engine, database engine, dictionary and MaterializedMySQL. If equal to 0, this setting is disabled", 0) \
M(UInt64, external_storage_connect_timeout_sec, DBMS_DEFAULT_CONNECT_TIMEOUT_SEC, "Connect timeout in seconds. Now supported only for MySQL", 0) \
M(UInt64, external_storage_rw_timeout_sec, DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC, "Read/write timeout in seconds. Now supported only for MySQL", 0) \
\
M(UnionMode, union_default_mode, UnionMode::Unspecified, "Set default Union Mode in SelectWithUnion query. Possible values: empty string, 'ALL', 'DISTINCT'. If empty, query without Union Mode will throw exception.", 0) \
M(Bool, optimize_aggregators_of_group_by_keys, true, "Eliminates min/max/any/anyLast aggregators of GROUP BY keys in SELECT section", 0) \
M(Bool, optimize_group_by_function_keys, true, "Eliminates functions of other keys in GROUP BY section", 0) \

View File

@ -23,6 +23,8 @@
# include <Databases/MySQL/ConnectionMySQLSettings.h>
# include <Databases/MySQL/DatabaseMySQL.h>
# include <Databases/MySQL/MaterializedMySQLSettings.h>
# include <Storages/MySQL/MySQLHelpers.h>
# include <Storages/MySQL/MySQLSettings.h>
# include <Databases/MySQL/DatabaseMaterializedMySQL.h>
# include <mysqlxx/Pool.h>
#endif
@ -198,13 +200,15 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
if (engine_name == "MySQL")
{
auto mysql_database_settings = std::make_unique<ConnectionMySQLSettings>();
auto mysql_pool = mysqlxx::PoolWithFailover(configuration.database, configuration.addresses, configuration.username, configuration.password);
MySQLSettings mysql_settings;
auto mysql_pool = createMySQLPoolWithFailover(configuration, mysql_settings);
mysql_database_settings->loadFromQueryContext(context);
mysql_database_settings->loadFromQuery(*engine_define); /// higher priority
return std::make_shared<DatabaseMySQL>(
context, database_name, metadata_path, engine_define, configuration.database, std::move(mysql_database_settings), std::move(mysql_pool));
context, database_name, metadata_path, engine_define, configuration.database,
std::move(mysql_database_settings), std::move(mysql_pool), create.attach);
}
MySQLClient client(configuration.host, configuration.port, configuration.username, configuration.password);

View File

@ -14,7 +14,7 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
}
IMPLEMENT_SETTINGS_TRAITS(ConnectionMySQLSettingsTraits, LIST_OF_CONNECTION_MYSQL_SETTINGS)
IMPLEMENT_SETTINGS_TRAITS(ConnectionMySQLSettingsTraits, LIST_OF_MYSQL_DATABASE_SETTINGS)
void ConnectionMySQLSettings::loadFromQuery(ASTStorage & storage_def)
{

View File

@ -4,6 +4,7 @@
#include <Core/Defines.h>
#include <Core/SettingsEnums.h>
#include <Interpreters/Context_fwd.h>
#include <Storages/MySQL/MySQLSettings.h>
namespace DB
{
@ -17,7 +18,11 @@ class ASTStorage;
#define APPLY_FOR_IMMUTABLE_CONNECTION_MYSQL_SETTINGS(M) \
M(mysql_datatypes_support_level)
DECLARE_SETTINGS_TRAITS(ConnectionMySQLSettingsTraits, LIST_OF_CONNECTION_MYSQL_SETTINGS)
#define LIST_OF_MYSQL_DATABASE_SETTINGS(M) \
LIST_OF_CONNECTION_MYSQL_SETTINGS(M) \
LIST_OF_MYSQL_SETTINGS(M)
DECLARE_SETTINGS_TRAITS(ConnectionMySQLSettingsTraits, LIST_OF_MYSQL_DATABASE_SETTINGS)
/** Settings for the MySQL database engine.

View File

@ -53,7 +53,8 @@ DatabaseMySQL::DatabaseMySQL(
const ASTStorage * database_engine_define_,
const String & database_name_in_mysql_,
std::unique_ptr<ConnectionMySQLSettings> settings_,
mysqlxx::PoolWithFailover && pool)
mysqlxx::PoolWithFailover && pool,
bool attach)
: IDatabase(database_name_)
, WithContext(context_->getGlobalContext())
, metadata_path(metadata_path_)
@ -62,7 +63,19 @@ DatabaseMySQL::DatabaseMySQL(
, database_settings(std::move(settings_))
, mysql_pool(std::move(pool))
{
empty(); /// test database is works fine.
try
{
/// Test that the database is working fine; it will also fetch tables.
empty();
}
catch (...)
{
if (attach)
tryLogCurrentException("DatabaseMySQL");
else
throw;
}
thread = ThreadFromGlobalPool{&DatabaseMySQL::cleanOutdatedTables, this};
}

View File

@ -45,7 +45,8 @@ public:
const ASTStorage * database_engine_define,
const String & database_name_in_mysql,
std::unique_ptr<ConnectionMySQLSettings> settings_,
mysqlxx::PoolWithFailover && pool);
mysqlxx::PoolWithFailover && pool,
bool attach);
String getEngineName() const override { return "MySQL"; }

View File

@ -14,6 +14,8 @@
#include <QueryPipeline/Pipe.h>
#include <QueryPipeline/QueryPipeline.h>
#include <Storages/ExternalDataSourceConfiguration.h>
#include <Storages/MySQL/MySQLHelpers.h>
#include <Storages/MySQL/MySQLSettings.h>
namespace DB
@ -46,13 +48,17 @@ void registerDictionarySourceMysql(DictionarySourceFactory & factory)
auto settings_config_prefix = config_prefix + ".mysql";
std::shared_ptr<mysqlxx::PoolWithFailover> pool;
ExternalDataSourceConfiguration configuration;
StorageMySQLConfiguration configuration;
auto named_collection = created_from_ddl ? getExternalDataSourceConfiguration(config, settings_config_prefix, global_context) : std::nullopt;
if (named_collection)
{
configuration = *named_collection;
std::vector<std::pair<String, UInt16>> addresses{std::make_pair(configuration.host, configuration.port)};
pool = std::make_shared<mysqlxx::PoolWithFailover>(configuration.database, addresses, configuration.username, configuration.password);
configuration.set(*named_collection);
configuration.addresses = {std::make_pair(configuration.host, configuration.port)};
MySQLSettings mysql_settings;
const auto & settings = global_context->getSettingsRef();
mysql_settings.connect_timeout = settings.external_storage_connect_timeout_sec;
mysql_settings.read_write_timeout = settings.external_storage_rw_timeout_sec;
pool = std::make_shared<mysqlxx::PoolWithFailover>(createMySQLPoolWithFailover(configuration, mysql_settings));
}
else
{

View File

@ -7,6 +7,7 @@
#include <Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.h>
#include <Disks/IO/ReadIndirectBufferFromRemoteFS.h>
#include <Disks/IO/WriteIndirectBufferFromRemoteFS.h>
#include <Common/getRandomASCIIString.h>
namespace DB
@ -93,7 +94,7 @@ std::unique_ptr<WriteBufferFromFileBase> DiskBlobStorage::writeFile(
WriteMode mode)
{
auto metadata = readOrCreateMetaForWriting(path, mode);
auto blob_path = path + "_" + getRandomName(8); /// NOTE: path contains the tmp_* prefix in the blob name
auto blob_path = path + "_" + getRandomASCIIString(8); /// NOTE: path contains the tmp_* prefix in the blob name
LOG_TRACE(log, "{} to file by path: {}. Blob Storage path: {}",
mode == WriteMode::Rewrite ? "Write" : "Append", backQuote(metadata_disk->getPath() + path), blob_path);

View File

@ -1,4 +1,5 @@
#include <Disks/RemoteDisksCommon.h>
#include <Common/getRandomASCIIString.h>
namespace DB
{
@ -8,17 +9,6 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
}
String getRandomName(size_t len, char first, char last)
{
std::uniform_int_distribution<int> distribution(first, last);
String res(len, ' ');
for (auto & c : res)
c = distribution(thread_local_rng);
return res;
}
std::shared_ptr<DiskCacheWrapper> wrapWithCache(
std::shared_ptr<IDisk> disk, String cache_name, String cache_path, String metadata_path)
{

View File

@ -6,13 +6,12 @@
#include <Common/thread_local_rng.h>
#include <Disks/IDisk.h>
#include <Disks/DiskCacheWrapper.h>
#include <Common/getRandomASCIIString.h>
namespace DB
{
String getRandomName(size_t len = 32, char first = 'a', char last = 'z');
std::shared_ptr<DiskCacheWrapper> wrapWithCache(
std::shared_ptr<IDisk> disk, String cache_name, String cache_path, String metadata_path);

View File

@ -16,6 +16,7 @@
#include <Common/createHardLink.h>
#include <Common/quoteString.h>
#include <Common/thread_local_rng.h>
#include <Common/getRandomASCIIString.h>
#include <Interpreters/Context.h>
#include <IO/ReadBufferFromS3.h>
@ -246,7 +247,7 @@ std::unique_ptr<WriteBufferFromFileBase> DiskS3::writeFile(const String & path,
auto metadata = readOrCreateMetaForWriting(path, mode);
/// Path to store new S3 object.
auto s3_path = getRandomName();
auto s3_path = getRandomASCIIString();
std::optional<ObjectMetadata> object_metadata;
if (settings->send_metadata)

View File

@ -2307,10 +2307,9 @@ namespace
if (parent_field_descriptor)
out << " field " << quoteString(parent_field_descriptor->full_name()) << " (" << parent_field_descriptor->type_name() << ")";
for (size_t i = 0; i != field_infos.size(); ++i)
for (const auto & field_info : field_infos)
{
out << "\n";
const auto & field_info = field_infos[i];
writeIndent(out, indent + 1) << "Columns #";
for (size_t j = 0; j != field_info.column_indices.size(); ++j)
{
@ -3017,8 +3016,11 @@ namespace
if (nested_message_serializer)
{
std::vector<std::string_view> column_names_used;
column_names_used.reserve(used_column_indices_in_nested.size());
for (size_t i : used_column_indices_in_nested)
column_names_used.emplace_back(nested_column_names[i]);
auto field_serializer = std::make_unique<ProtobufSerializerFlattenedNestedAsArrayOfNestedMessages>(
std::move(column_names_used), field_descriptor, std::move(nested_message_serializer), get_root_desc_function);
transformColumnIndices(used_column_indices_in_nested, nested_column_indices);

View File

@ -8,7 +8,7 @@ namespace DB
{
void registerFunctionBase64Decode(FunctionFactory & factory)
{
tb64ini(0, 1);
tb64ini(0, 0);
factory.registerFunction<FunctionBase64Conversion<Base64Decode>>();
/// MysQL compatibility alias.

View File

@ -10,7 +10,7 @@ namespace DB
{
void registerFunctionBase64Encode(FunctionFactory & factory)
{
tb64ini(0, 1);
tb64ini(0, 0);
factory.registerFunction<FunctionBase64Conversion<Base64Encode>>();
/// MysQL compatibility alias.

View File

@ -18,6 +18,7 @@ namespace ErrorCodes
extern const int ILLEGAL_COLUMN;
extern const int DECIMAL_OVERFLOW;
extern const int ARGUMENT_OUT_OF_BOUND;
extern const int LOGICAL_ERROR;
}
@ -142,6 +143,7 @@ public:
else if (const ColumnFixedString * col_in_fixed = checkAndGetColumn<ColumnFixedString>(col_in_untyped.get()))
{
const auto n = col_in_fixed->getN();
const auto col_in_rows = col_in_fixed->size();
auto col_to = ColumnFixedString::create(n);
ColumnFixedString::Chars & chars_to = col_to->getChars();
@ -153,7 +155,16 @@ public:
const auto * ptr_in = col_in_fixed->getChars().data();
auto * ptr_to = chars_to.data();
fuzzBits(ptr_in, ptr_to, chars_to.size(), inverse_probability);
if (col_in_rows >= input_rows_count)
fuzzBits(ptr_in, ptr_to, chars_to.size(), inverse_probability);
else if (col_in_rows != 1)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"1 != col_in_rows {} < input_rows_count {}", col_in_rows, input_rows_count);
else
for (size_t i = 0; i < input_rows_count; ++i)
fuzzBits(ptr_in, ptr_to + i * n, n, inverse_probability);
return col_to;
}

View File

@ -68,7 +68,7 @@ bool ReadBufferFromBlobStorage::nextImpl()
data_capacity = internal_buffer.size();
}
size_t to_read_bytes = std::min(total_size - offset, data_capacity);
size_t to_read_bytes = std::min(static_cast<size_t>(total_size - offset), data_capacity);
size_t bytes_read = 0;
size_t sleep_time_with_backoff_milliseconds = 100;

View File

@ -427,7 +427,7 @@ namespace detail
LOG_ERROR(log,
"HTTP request to `{}` failed at try {}/{} with bytes read: {}/{}. "
"Error: {}. (Current backoff wait is {}/{} ms)",
uri.toString(), i, settings.http_max_tries,
uri.toString(), i + 1, settings.http_max_tries,
getOffset(), read_range.end ? toString(*read_range.end) : "unknown",
e.displayText(),
milliseconds_to_wait, settings.http_retry_max_backoff_ms);

View File

@ -6,6 +6,7 @@
#include <IO/WriteBufferFromBlobStorage.h>
#include <Disks/RemoteDisksCommon.h>
#include <Common/getRandomASCIIString.h>
namespace DB
@ -42,7 +43,7 @@ void WriteBufferFromBlobStorage::nextImpl()
{
auto part_len = std::min(len - read, max_single_part_upload_size);
auto block_id = getRandomName(64);
auto block_id = getRandomASCIIString(64);
block_ids.push_back(block_id);
Azure::Core::IO::MemoryBodyStream tmp_buffer(reinterpret_cast<uint8_t *>(buffer_begin + read), part_len);

View File

@ -57,9 +57,14 @@ BlockIO InterpreterCreateFunctionQuery::execute()
void InterpreterCreateFunctionQuery::validateFunction(ASTPtr function, const String & name)
{
const auto * args_tuple = function->as<ASTFunction>()->arguments->children.at(0)->as<ASTFunction>();
auto & lambda_function = function->as<ASTFunction &>();
auto & lambda_function_expression_list = lambda_function.arguments->children;
const auto & tuple_function_arguments = lambda_function_expression_list.at(0)->as<ASTFunction &>();
std::unordered_set<String> arguments;
for (const auto & argument : args_tuple->arguments->children)
for (const auto & argument : tuple_function_arguments.arguments->children)
{
const auto & argument_name = argument->as<ASTIdentifier>()->name();
auto [_, inserted] = arguments.insert(argument_name);
@ -67,7 +72,7 @@ void InterpreterCreateFunctionQuery::validateFunction(ASTPtr function, const Str
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Identifier {} already used as function parameter", argument_name);
}
ASTPtr function_body = function->as<ASTFunction>()->children.at(0)->children.at(1);
ASTPtr function_body = lambda_function_expression_list.at(1);
validateFunctionRecursiveness(function_body, name);
}
@ -82,5 +87,4 @@ void InterpreterCreateFunctionQuery::validateFunctionRecursiveness(ASTPtr node,
validateFunctionRecursiveness(child, function_to_create);
}
}
}

View File

@ -196,6 +196,9 @@ Chain InterpreterInsertQuery::buildChainImpl(
/// We create a pipeline of several streams, into which we will write data.
Chain out;
/// Keep a reference to the context to make sure it stays alive until the chain is executed and destroyed
out.addInterpreterContext(context_ptr);
/// NOTE: we explicitly ignore bound materialized views when inserting into Kafka Storage.
/// Otherwise we'll get duplicates when MV reads same rows again from Kafka.
if (table->noPushingToViews() && !no_destination)

View File

@ -113,8 +113,10 @@ String InterpreterSelectQuery::generateFilterActions(ActionsDAGPtr & actions, co
select_ast->setExpression(ASTSelectQuery::Expression::SELECT, std::make_shared<ASTExpressionList>());
auto expr_list = select_ast->select();
// The first column is our filter expression.
expr_list->children.push_back(row_policy_filter);
/// The first column is our filter expression.
/// the row_policy_filter should be cloned, because it may be changed by TreeRewriter.
/// which make it possible an invalid expression, although it may be valid in whole select.
expr_list->children.push_back(row_policy_filter->clone());
/// Keep columns that are required after the filter actions.
for (const auto & column_str : prerequisite_columns)
@ -386,7 +388,9 @@ InterpreterSelectQuery::InterpreterSelectQuery(
query.setFinal();
/// Save scalar sub queries's results in the query context
if (!options.only_analyze && context->hasQueryContext())
/// But discard them if the Storage has been modified
/// In an ideal situation we would only discard the scalars affected by the storage change
if (!options.only_analyze && context->hasQueryContext() && !context->getViewSource())
for (const auto & it : syntax_analyzer_result->getScalars())
context->getQueryContext()->addScalar(it.first, it.second);

View File

@ -440,7 +440,7 @@ static ASTPtr tryGetTableOverride(const String & mapped_database, const String &
if (auto database_ptr = DatabaseCatalog::instance().tryGetDatabase(mapped_database))
{
auto create_query = database_ptr->getCreateDatabaseQuery();
if (auto create_database_query = create_query->as<ASTCreateQuery>())
if (auto * create_database_query = create_query->as<ASTCreateQuery>())
{
if (create_database_query->table_overrides)
{
@ -537,8 +537,8 @@ ASTs InterpreterCreateImpl::getRewrittenQueries(
if (auto table_override = tryGetTableOverride(mapped_to_database, create_query.table))
{
auto override = table_override->as<ASTTableOverride>();
override->applyToCreateTableQuery(rewritten_query.get());
auto * override_ast = table_override->as<ASTTableOverride>();
override_ast->applyToCreateTableQuery(rewritten_query.get());
}
return ASTs{rewritten_query};

View File

@ -653,7 +653,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
std::unique_ptr<OpenTelemetrySpanHolder> span;
if (context->query_trace_context.trace_id != UUID())
{
auto raw_interpreter_ptr = interpreter.get();
auto * raw_interpreter_ptr = interpreter.get();
std::string class_name(abi::__cxa_demangle(typeid(*raw_interpreter_ptr).name(), nullptr, nullptr, nullptr));
span = std::make_unique<OpenTelemetrySpanHolder>(class_name + "::execute()");
}

View File

@ -427,8 +427,11 @@ void ASTCreateQuery::formatQueryImpl(const FormatSettings & settings, FormatStat
if (select)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << " AS" << settings.nl_or_ws << (settings.hilite ? hilite_none : "");
settings.ostr << (settings.hilite ? hilite_keyword : "") << " AS"
<< (comment ? "(" : "")
<< settings.nl_or_ws << (settings.hilite ? hilite_none : "");
select->formatImpl(settings, state, frame);
settings.ostr << (comment ? ")" : "");
}
if (tables)

View File

@ -40,7 +40,7 @@ public:
String getID(char) const override { return "TableOverrideList"; }
ASTPtr clone() const override;
void formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override;
void setTableOverride(const String & name, ASTPtr override);
void setTableOverride(const String & name, const ASTPtr ast);
void removeTableOverride(const String & name);
ASTPtr tryGetTableOverride(const String & name) const;
bool hasOverride(const String & name) const;

View File

@ -747,6 +747,7 @@ bool ParserCreateLiveViewQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & e
if (!select_p.parse(pos, select, expected))
return false;
auto comment = parseComment(pos, expected);
auto query = std::make_shared<ASTCreateQuery>();
node = query;
@ -781,6 +782,9 @@ bool ParserCreateLiveViewQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & e
if (live_view_periodic_refresh)
query->live_view_periodic_refresh.emplace(live_view_periodic_refresh->as<ASTLiteral &>().value.safeGet<UInt64>());
if (comment)
query->set(query->comment, comment);
return true;
}

View File

@ -83,16 +83,16 @@ TEST_P(TableOverrideTest, applyOverrides)
ASSERT_NE(nullptr, database);
ASTPtr table_ast;
ASSERT_NO_THROW(table_ast = parseQuery(parser, table_query, 0, 0));
auto table = table_ast->as<ASTCreateQuery>();
auto * table = table_ast->as<ASTCreateQuery>();
ASSERT_NE(nullptr, table);
auto table_name = table->table->as<ASTIdentifier>()->name();
if (database->table_overrides)
{
auto override_ast = database->table_overrides->tryGetTableOverride(table_name);
ASSERT_NE(nullptr, override_ast);
auto override = override_ast->as<ASTTableOverride>();
ASSERT_NE(nullptr, override);
override->applyToCreateTableQuery(table);
auto * override_table_ast = override_ast->as<ASTTableOverride>();
ASSERT_NE(nullptr, override_table_ast);
override_table_ast->applyToCreateTableQuery(table);
}
EXPECT_EQ(expected_query, serializeAST(*table));
}

View File

@ -59,11 +59,12 @@ namespace DB
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int DUPLICATE_COLUMN;
extern const int THERE_IS_NO_COLUMN;
extern const int UNKNOWN_EXCEPTION;
extern const int UNKNOWN_TYPE;
extern const int VALUE_IS_OUT_OF_RANGE_OF_DATA_TYPE;
extern const int THERE_IS_NO_COLUMN;
extern const int BAD_ARGUMENTS;
extern const int UNKNOWN_EXCEPTION;
}
@ -519,9 +520,11 @@ ArrowColumnToCHColumn::ArrowColumnToCHColumn(
void ArrowColumnToCHColumn::arrowTableToCHChunk(Chunk & res, std::shared_ptr<arrow::Table> & table)
{
NameToColumnPtr name_to_column_ptr;
for (const auto& column_name : table->ColumnNames())
for (const auto & column_name : table->ColumnNames())
{
std::shared_ptr<arrow::ChunkedArray> arrow_column = table->GetColumnByName(column_name);
if (!arrow_column)
throw Exception(ErrorCodes::DUPLICATE_COLUMN, "Column '{}' is duplicated", column_name);
name_to_column_ptr[column_name] = arrow_column;
}

View File

@ -24,14 +24,14 @@ static FormatSettings updateFormatSettings(const FormatSettings & settings)
CustomSeparatedRowInputFormat::CustomSeparatedRowInputFormat(
const Block & header_,
ReadBuffer & in_,
ReadBuffer & in_buf_,
const Params & params_,
bool with_names_,
bool with_types_,
bool ignore_spaces_,
const FormatSettings & format_settings_)
: CustomSeparatedRowInputFormat(
header_, std::make_unique<PeekableReadBuffer>(in_), params_, with_names_, with_types_, ignore_spaces_, format_settings_)
header_, std::make_unique<PeekableReadBuffer>(in_buf_), params_, with_names_, with_types_, ignore_spaces_, format_settings_)
{
}

View File

@ -25,7 +25,7 @@ public:
private:
CustomSeparatedRowInputFormat(
const Block & header_,
std::unique_ptr<PeekableReadBuffer> in_,
std::unique_ptr<PeekableReadBuffer> in_buf_,
const Params & params_,
bool with_names_, bool with_types_, bool ignore_spaces_, const FormatSettings & format_settings_);
using EscapingRule = FormatSettings::EscapingRule;

View File

@ -1,5 +1,6 @@
#pragma once
#include <Interpreters/Context_fwd.h>
#include <Processors/IProcessor.h>
#include <QueryPipeline/PipelineResourcesHolder.h>
@ -42,6 +43,7 @@ public:
void addTableLock(TableLockHolder lock) { holder.table_locks.emplace_back(std::move(lock)); }
void addStorageHolder(StoragePtr storage) { holder.storage_holders.emplace_back(std::move(storage)); }
void attachResources(PipelineResourcesHolder holder_) { holder = std::move(holder_); }
void addInterpreterContext(ContextPtr context) { holder.interpreter_context.emplace_back(std::move(context)); }
PipelineResourcesHolder detachResources() { return std::move(holder); }
void reset();

View File

@ -46,7 +46,7 @@ struct AsyncDrainTask
std::shared_ptr<IConnections> shared_connections;
void operator()() const
{
ConnectionCollector::drainConnections(*shared_connections);
ConnectionCollector::drainConnections(*shared_connections, /* throw_error= */ false);
}
// We don't have std::unique_function yet. Wrap it in shared_ptr to make the functor copyable.
@ -71,7 +71,7 @@ std::shared_ptr<IConnections> ConnectionCollector::enqueueConnectionCleanup(
return connections;
}
void ConnectionCollector::drainConnections(IConnections & connections) noexcept
void ConnectionCollector::drainConnections(IConnections & connections, bool throw_error)
{
bool is_drained = false;
try
@ -90,6 +90,9 @@ void ConnectionCollector::drainConnections(IConnections & connections) noexcept
break;
default:
/// Connection should be closed in case of unknown packet,
/// since this means that the connection in some bad state.
is_drained = false;
throw Exception(
ErrorCodes::UNKNOWN_PACKET_FROM_SERVER,
"Unknown packet {} from one of the following replicas: {}",
@ -111,6 +114,9 @@ void ConnectionCollector::drainConnections(IConnections & connections) noexcept
tryLogCurrentException(&Poco::Logger::get("ConnectionCollector"), __PRETTY_FUNCTION__);
}
}
if (throw_error)
throw;
}
}

View File

@ -17,7 +17,7 @@ public:
static ConnectionCollector & init(ContextMutablePtr global_context_, size_t max_threads);
static std::shared_ptr<IConnections>
enqueueConnectionCleanup(const ConnectionPoolWithFailoverPtr & pool, std::shared_ptr<IConnections> connections) noexcept;
static void drainConnections(IConnections & connections) noexcept;
static void drainConnections(IConnections & connections, bool throw_error);
private:
explicit ConnectionCollector(ContextMutablePtr global_context_, size_t max_threads);

View File

@ -495,14 +495,26 @@ void RemoteQueryExecutor::finish(std::unique_ptr<ReadContext> * read_context)
/// Send the request to abort the execution of the request, if not already sent.
tryCancel("Cancelling query because enough data has been read", read_context);
/// Try to drain connections asynchronously.
if (auto conn = ConnectionCollector::enqueueConnectionCleanup(pool, connections))
if (context->getSettingsRef().drain_timeout != Poco::Timespan(-1000000))
{
/// Drain connections synchronously.
auto connections_left = ConnectionCollector::enqueueConnectionCleanup(pool, connections);
if (connections_left)
{
/// Drain connections synchronously and suppress errors.
CurrentMetrics::Increment metric_increment(CurrentMetrics::ActiveSyncDrainedConnections);
ConnectionCollector::drainConnections(*connections_left, /* throw_error= */ false);
CurrentMetrics::add(CurrentMetrics::SyncDrainedConnections, 1);
}
}
else
{
/// Drain connections synchronously w/o suppressing errors.
CurrentMetrics::Increment metric_increment(CurrentMetrics::ActiveSyncDrainedConnections);
ConnectionCollector::drainConnections(*conn);
ConnectionCollector::drainConnections(*connections, /* throw_error= */ true);
CurrentMetrics::add(CurrentMetrics::SyncDrainedConnections, 1);
}
finished = true;
}

View File

@ -54,6 +54,7 @@ void ExternalDataSourceConfiguration::set(const ExternalDataSourceConfiguration
database = conf.database;
table = conf.table;
schema = conf.schema;
addresses = conf.addresses;
addresses_expr = conf.addresses_expr;
}

View File

@ -516,7 +516,7 @@ public:
virtual void shutdown() {}
/// Called before shutdown() to flush data to underlying storage
/// (for Buffer)
/// Data in memory need to be persistent
virtual void flush() {}
/// Asks table to stop executing some action identified by action_type

View File

@ -280,7 +280,8 @@ StorageLiveView::StorageLiveView(
const StorageID & table_id_,
ContextPtr context_,
const ASTCreateQuery & query,
const ColumnsDescription & columns_)
const ColumnsDescription & columns_,
const String & comment)
: IStorage(table_id_)
, WithContext(context_->getGlobalContext())
{
@ -291,6 +292,9 @@ StorageLiveView::StorageLiveView(
StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(columns_);
if (!comment.empty())
storage_metadata.setComment(comment);
setInMemoryMetadata(storage_metadata);
if (!query.select)
@ -621,7 +625,7 @@ void registerStorageLiveView(StorageFactory & factory)
"Experimental LIVE VIEW feature is not enabled (the setting 'allow_experimental_live_view')",
ErrorCodes::SUPPORT_IS_DISABLED);
return StorageLiveView::create(args.table_id, args.getLocalContext(), args.query, args.columns);
return StorageLiveView::create(args.table_id, args.getLocalContext(), args.query, args.columns, args.comment);
});
}

View File

@ -232,8 +232,8 @@ private:
const StorageID & table_id_,
ContextPtr context_,
const ASTCreateQuery & query,
const ColumnsDescription & columns
);
const ColumnsDescription & columns,
const String & comment);
};
}

View File

@ -1542,6 +1542,24 @@ void MergeTreeData::removePartsFinally(const MergeTreeData::DataPartsVector & pa
}
}
void MergeTreeData::flushAllInMemoryPartsIfNeeded()
{
if (getSettings()->in_memory_parts_enable_wal)
return;
auto metadata_snapshot = getInMemoryMetadataPtr();
DataPartsVector parts = getDataPartsVector();
for (const auto & part : parts)
{
if (auto part_in_memory = asInMemoryPart(part))
{
const auto & storage_relative_path = part_in_memory->storage.relative_data_path;
part_in_memory->flushToDisk(storage_relative_path, part_in_memory->relative_path, metadata_snapshot);
}
}
}
size_t MergeTreeData::clearOldPartsFromFilesystem(bool force)
{
DataPartsVector parts_to_remove = grabOldParts(force);

View File

@ -566,6 +566,9 @@ public:
/// Removes parts from data_parts, they should be in Deleting state
void removePartsFinally(const DataPartsVector & parts);
/// When WAL is not enabled, the InMemoryParts need to be persistent.
void flushAllInMemoryPartsIfNeeded();
/// Delete irrelevant parts from memory and disk.
/// If 'force' - don't wait for old_parts_lifetime.
size_t clearOldPartsFromFilesystem(bool force = false);

View File

@ -146,10 +146,15 @@ bool MergeTreeIndexhypothesisMergedCondition::mayBeTrueOnGranule(const MergeTree
values.push_back(granule->met);
}
if (const auto it = answer_cache.find(values); it != std::end(answer_cache))
return it->second;
const ComparisonGraph * graph = nullptr;
const auto & graph = getGraph(values);
{
std::lock_guard lock(cache_mutex);
if (const auto it = answer_cache.find(values); it != std::end(answer_cache))
return it->second;
graph = getGraph(values);
}
bool always_false = false;
expression_cnf->iterateGroups(
@ -166,7 +171,7 @@ bool MergeTreeIndexhypothesisMergedCondition::mayBeTrueOnGranule(const MergeTree
if (func && func->arguments->children.size() == 2)
{
const auto expected = ComparisonGraph::atomToCompareResult(atom);
if (graph.isPossibleCompare(expected, func->arguments->children[0], func->arguments->children[1]))
if (graph->isPossibleCompare(expected, func->arguments->children[0], func->arguments->children[1]))
{
/// If graph failed use matching.
/// We don't need to check constraints.
@ -177,6 +182,8 @@ bool MergeTreeIndexhypothesisMergedCondition::mayBeTrueOnGranule(const MergeTree
always_false = true;
});
std::lock_guard lock(cache_mutex);
answer_cache[values] = !always_false;
return !always_false;
}
@ -195,11 +202,13 @@ std::unique_ptr<ComparisonGraph> MergeTreeIndexhypothesisMergedCondition::buildG
return std::make_unique<ComparisonGraph>(active_atomic_formulas);
}
const ComparisonGraph & MergeTreeIndexhypothesisMergedCondition::getGraph(const std::vector<bool> & values) const
const ComparisonGraph * MergeTreeIndexhypothesisMergedCondition::getGraph(const std::vector<bool> & values) const
{
if (!graph_cache.contains(values))
graph_cache[values] = buildGraph(values);
return *graph_cache.at(values);
auto [it, inserted] = graph_cache.try_emplace(values);
if (inserted)
it->second = buildGraph(values);
return it->second.get();
}
}

View File

@ -21,11 +21,14 @@ public:
private:
void addConstraints(const ConstraintsDescription & constraints_description);
std::unique_ptr<ComparisonGraph> buildGraph(const std::vector<bool> & values) const;
const ComparisonGraph & getGraph(const std::vector<bool> & values) const;
const ComparisonGraph * getGraph(const std::vector<bool> & values) const;
ASTPtr expression_ast;
std::unique_ptr<CNFQuery> expression_cnf;
/// Part analysis can be done in parallel.
/// So, we have shared answer and graph cache.
mutable std::mutex cache_mutex;
mutable std::unordered_map<std::vector<bool>, std::unique_ptr<ComparisonGraph>> graph_cache;
mutable std::unordered_map<std::vector<bool>, bool> answer_cache;

View File

@ -0,0 +1,26 @@
#include "MySQLHelpers.h"
#if USE_MYSQL
#include <mysqlxx/PoolWithFailover.h>
#include <Storages/ExternalDataSourceConfiguration.h>
#include <Storages/MySQL/MySQLSettings.h>
namespace DB
{
mysqlxx::PoolWithFailover
createMySQLPoolWithFailover(const StorageMySQLConfiguration & configuration, const MySQLSettings & mysql_settings)
{
return mysqlxx::PoolWithFailover(
configuration.database, configuration.addresses, configuration.username, configuration.password,
MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_START_CONNECTIONS,
mysql_settings.connection_pool_size,
mysql_settings.connection_max_tries,
mysql_settings.connection_wait_timeout,
mysql_settings.connect_timeout,
mysql_settings.read_write_timeout);
}
}
#endif

View File

@ -0,0 +1,19 @@
#pragma once
#include "config_core.h"
#if USE_MYSQL
#include <Interpreters/Context_fwd.h>
namespace mysqlxx { class PoolWithFailover; }
namespace DB
{
struct StorageMySQLConfiguration;
struct MySQLSettings;
mysqlxx::PoolWithFailover
createMySQLPoolWithFailover(const StorageMySQLConfiguration & configuration, const MySQLSettings & mysql_settings);
}
#endif

View File

@ -19,6 +19,8 @@ class ASTStorage;
M(UInt64, connection_max_tries, 3, "Number of retries for pool with failover", 0) \
M(UInt64, connection_wait_timeout, 5, "Timeout (in seconds) for waiting for free connection (in case of there is already connection_pool_size active connections), 0 - do not wait.", 0) \
M(Bool, connection_auto_close, true, "Auto-close connection after query execution, i.e. disable connection reuse.", 0) \
M(UInt64, connect_timeout, DBMS_DEFAULT_CONNECT_TIMEOUT_SEC, "Connect timeout (in seconds)", 0) \
M(UInt64, read_write_timeout, DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC, "Read/write timeout (in seconds)", 0) \
DECLARE_SETTINGS_TRAITS(MySQLSettingsTraits, LIST_OF_MYSQL_SETTINGS)

View File

@ -156,9 +156,6 @@ StoragePtr StorageFactory::get(
throw Exception("Unknown table engine " + name, ErrorCodes::UNKNOWN_STORAGE);
}
if (query.comment)
comment = query.comment->as<ASTLiteral &>().value.get<String>();
auto check_feature = [&](String feature_description, FeatureMatcherFn feature_matcher_fn)
{
if (!feature_matcher_fn(it->second.features))
@ -204,6 +201,9 @@ StoragePtr StorageFactory::get(
}
}
if (query.comment)
comment = query.comment->as<ASTLiteral &>().value.get<String>();
ASTs empty_engine_args;
Arguments arguments{
.engine_name = name,

View File

@ -60,7 +60,8 @@ StorageMaterializedView::StorageMaterializedView(
ContextPtr local_context,
const ASTCreateQuery & query,
const ColumnsDescription & columns_,
bool attach_)
bool attach_,
const String & comment)
: IStorage(table_id_), WithMutableContext(local_context->getGlobalContext())
{
StorageInMemoryMetadata storage_metadata;
@ -81,6 +82,9 @@ StorageMaterializedView::StorageMaterializedView(
auto select = SelectQueryDescription::getSelectQueryFromASTForMatView(query.select->clone(), local_context);
storage_metadata.setSelectQuery(select);
if (!comment.empty())
storage_metadata.setComment(comment);
setInMemoryMetadata(storage_metadata);
bool point_to_itself_by_uuid = has_inner_table && query.to_inner_uuid != UUIDHelpers::Nil
@ -432,7 +436,7 @@ void registerStorageMaterializedView(StorageFactory & factory)
/// Pass local_context here to convey setting for inner table
return StorageMaterializedView::create(
args.table_id, args.getLocalContext(), args.query,
args.columns, args.attach);
args.columns, args.attach, args.comment);
});
}

View File

@ -108,7 +108,8 @@ protected:
ContextPtr local_context,
const ASTCreateQuery & query,
const ColumnsDescription & columns_,
bool attach_);
bool attach_,
const String & comment);
};
}

View File

@ -156,6 +156,10 @@ void StorageMergeTree::startup()
}
}
void StorageMergeTree::flush()
{
flushAllInMemoryPartsIfNeeded();
}
void StorageMergeTree::shutdown()
{

View File

@ -31,6 +31,7 @@ class StorageMergeTree final : public shared_ptr_helper<StorageMergeTree>, publi
friend struct shared_ptr_helper<StorageMergeTree>;
public:
void startup() override;
void flush() override;
void shutdown() override;
~StorageMergeTree() override;

View File

@ -4,6 +4,7 @@
#include <Storages/StorageFactory.h>
#include <Storages/transformQueryForExternalDatabase.h>
#include <Storages/MySQL/MySQLHelpers.h>
#include <Processors/Sources/MySQLSource.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Core/Settings.h>
@ -306,13 +307,7 @@ void registerStorageMySQL(StorageFactory & factory)
if (!mysql_settings.connection_pool_size)
throw Exception("connection_pool_size cannot be zero.", ErrorCodes::BAD_ARGUMENTS);
mysqlxx::PoolWithFailover pool(
configuration.database, configuration.addresses,
configuration.username, configuration.password,
MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_START_CONNECTIONS,
mysql_settings.connection_pool_size,
mysql_settings.connection_max_tries,
mysql_settings.connection_wait_timeout);
mysqlxx::PoolWithFailover pool = createMySQLPoolWithFailover(configuration, mysql_settings);
return StorageMySQL::create(
args.table_id,

View File

@ -527,7 +527,7 @@ inline void StorageWindowView::fire(UInt32 watermark)
for (auto & watch_stream : watch_streams)
{
if (auto watch_stream_ptr = watch_stream.lock())
watch_stream_ptr->addBlock(block);
watch_stream_ptr->addBlock(block, watermark);
}
}
if (!target_table_id.empty())
@ -910,7 +910,11 @@ Pipe StorageWindowView::watch(
}
auto reader = std::make_shared<WindowViewSource>(
*this, has_limit, limit,
std::static_pointer_cast<StorageWindowView>(shared_from_this()),
query.is_watch_events,
window_view_timezone,
has_limit,
limit,
local_context->getSettingsRef().window_view_heartbeat_interval.totalSeconds());
std::lock_guard lock(fire_signal_mutex);
@ -1077,7 +1081,8 @@ ASTPtr StorageWindowView::innerQueryParser(const ASTSelectQuery & query)
ErrorCodes::ILLEGAL_COLUMN,
"Illegal column #{} of time zone argument of function, must be constant string",
time_zone_arg_num);
time_zone = &DateLUT::instance(time_zone_ast->value.safeGet<String>());
window_view_timezone = time_zone_ast->value.safeGet<String>();
time_zone = &DateLUT::instance(window_view_timezone);
}
else
time_zone = &DateLUT::instance();
@ -1354,9 +1359,12 @@ Block & StorageWindowView::getHeader() const
sample_block = InterpreterSelectQuery(
select_query->clone(), window_view_context, getParentStorage(), nullptr,
SelectQueryOptions(QueryProcessingStage::Complete)).getSampleBlock();
/// convert all columns to full columns
/// in case some of them are constant
for (size_t i = 0; i < sample_block.columns(); ++i)
{
sample_block.safeGetByPosition(i).column = sample_block.safeGetByPosition(i).column->convertToFullColumnIfConst();
}
}
return sample_block;
}

View File

@ -210,6 +210,7 @@ private:
BackgroundSchedulePool::TaskHolder clean_cache_task;
BackgroundSchedulePool::TaskHolder fire_task;
String window_view_timezone;
String function_now_timezone;
ASTPtr innerQueryParser(const ASTSelectQuery & query);

View File

@ -11,83 +11,109 @@ class WindowViewSource : public SourceWithProgress
{
public:
WindowViewSource(
StorageWindowView & storage_,
std::shared_ptr<StorageWindowView> storage_,
const bool is_events_,
String window_view_timezone_,
const bool has_limit_,
const UInt64 limit_,
const UInt64 heartbeat_interval_sec_)
: SourceWithProgress(storage_.getHeader())
: SourceWithProgress(
is_events_ ? Block(
{ColumnWithTypeAndName(ColumnUInt32::create(), std::make_shared<DataTypeDateTime>(window_view_timezone_), "watermark")})
: storage_->getHeader())
, storage(storage_)
, is_events(is_events_)
, window_view_timezone(window_view_timezone_)
, has_limit(has_limit_)
, limit(limit_)
, heartbeat_interval_sec(heartbeat_interval_sec_) {}
, heartbeat_interval_sec(heartbeat_interval_sec_)
{
if (is_events)
header.insert(
ColumnWithTypeAndName(ColumnUInt32::create(), std::make_shared<DataTypeDateTime>(window_view_timezone_), "watermark"));
else
header = storage->getHeader();
}
String getName() const override { return "WindowViewSource"; }
void addBlock(Block block_)
void addBlock(Block block_, UInt32 watermark)
{
std::lock_guard lock(blocks_mutex);
blocks.push_back(std::move(block_));
blocks_with_watermark.push_back({std::move(block_), watermark});
}
protected:
Block getHeader() const { return storage.getHeader(); }
Block getHeader() const { return header; }
Chunk generate() override
{
auto block = generateImpl();
return Chunk(block.getColumns(), block.rows());
Block block;
UInt32 watermark;
std::tie(block, watermark) = generateImpl();
if (is_events)
{
return Chunk(
{DataTypeDateTime(window_view_timezone).createColumnConst(block.rows(), watermark)->convertToFullColumnIfConst()},
block.rows());
}
else
{
return Chunk(block.getColumns(), block.rows());
}
}
Block generateImpl()
std::pair<Block, UInt32> generateImpl()
{
Block res;
if (has_limit && num_updates == static_cast<Int64>(limit))
return Block();
return {Block(), 0};
if (isCancelled() || storage.shutdown_called)
return Block();
if (isCancelled() || storage->shutdown_called)
return {Block(), 0};
std::unique_lock lock(blocks_mutex);
if (blocks.empty())
if (blocks_with_watermark.empty())
{
if (!end_of_blocks)
{
end_of_blocks = true;
num_updates += 1;
return getHeader();
return {getHeader(), 0};
}
storage.fire_condition.wait_for(lock, std::chrono::seconds(heartbeat_interval_sec));
storage->fire_condition.wait_for(lock, std::chrono::seconds(heartbeat_interval_sec));
if (isCancelled() || storage.shutdown_called)
if (isCancelled() || storage->shutdown_called)
{
return Block();
return {Block(), 0};
}
if (blocks.empty())
return getHeader();
if (blocks_with_watermark.empty())
return {getHeader(), 0};
else
{
end_of_blocks = false;
res = blocks.front();
blocks.pop_front();
auto res = blocks_with_watermark.front();
blocks_with_watermark.pop_front();
return res;
}
}
else
{
res = blocks.front();
blocks.pop_front();
auto res = blocks_with_watermark.front();
blocks_with_watermark.pop_front();
return res;
}
}
private:
StorageWindowView & storage;
std::shared_ptr<StorageWindowView> storage;
BlocksList blocks;
std::list<std::pair<Block, UInt32>> blocks_with_watermark;
Block header;
const bool is_events;
String window_view_timezone;
const bool has_limit;
const UInt64 limit;
Int64 num_updates = -1;

View File

@ -8,6 +8,7 @@
#include <Parsers/ASTFunction.h>
#include <Storages/StorageMySQL.h>
#include <Storages/MySQL/MySQLSettings.h>
#include <Storages/MySQL/MySQLHelpers.h>
#include <TableFunctions/ITableFunction.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <TableFunctions/TableFunctionMySQL.h>
@ -37,7 +38,11 @@ void TableFunctionMySQL::parseArguments(const ASTPtr & ast_function, ContextPtr
throw Exception("Table function 'mysql' must have arguments.", ErrorCodes::LOGICAL_ERROR);
configuration = StorageMySQL::getConfiguration(args_func.arguments->children, context);
pool.emplace(configuration->database, configuration->addresses, configuration->username, configuration->password);
MySQLSettings mysql_settings;
const auto & settings = context->getSettingsRef();
mysql_settings.connect_timeout = settings.external_storage_connect_timeout_sec;
mysql_settings.read_write_timeout = settings.external_storage_rw_timeout_sec;
pool.emplace(createMySQLPoolWithFailover(*configuration, mysql_settings));
}
ColumnsDescription TableFunctionMySQL::getActualTableStructure(ContextPtr context) const

View File

@ -76,15 +76,23 @@ def get_image_name(build_config):
return 'clickhouse/deb-builder'
def build_clickhouse(packager_cmd, logs_path):
def build_clickhouse(packager_cmd, logs_path, build_output_path):
build_log_path = os.path.join(logs_path, 'build_log.log')
with TeePopen(packager_cmd, build_log_path) as process:
retcode = process.wait()
if os.path.exists(build_output_path):
build_results = os.listdir(build_output_path)
else:
build_results = []
if retcode == 0:
logging.info("Built successfully")
if len(build_results) != 0:
logging.info("Built successfully")
else:
logging.info("Success exit code, but no build artifacts => build failed")
else:
logging.info("Build failed")
return build_log_path, retcode == 0
return build_log_path, retcode == 0 and len(build_results) > 0
def get_build_results_if_exists(s3_helper, s3_prefix):
@ -136,8 +144,10 @@ if __name__ == "__main__":
if 'release' in pr_info.labels or 'release-lts' in pr_info.labels:
# for release pull requests we use branch names prefixes, not pr numbers
release_or_pr = pr_info.head_ref
elif pr_info.number == 0:
# for pushes to master - major version
elif pr_info.number == 0 and build_config['package_type'] != "performance":
# for pushes to master - major version, but not for performance builds
# they havily relies on a fixed path for build package and nobody going
# to deploy them somewhere, so it's ok.
release_or_pr = ".".join(version.as_tuple()[:2])
else:
# PR number for anything else
@ -157,7 +167,7 @@ if __name__ == "__main__":
log_url = 'https://s3.amazonaws.com/clickhouse-builds/' + url.replace('+', '%2B').replace(' ', '%20')
else:
build_urls.append('https://s3.amazonaws.com/clickhouse-builds/' + url.replace('+', '%2B').replace(' ', '%20'))
create_json_artifact(temp_path, build_name, log_url, build_urls, build_config, 0, True)
create_json_artifact(temp_path, build_name, log_url, build_urls, build_config, 0, len(build_urls) > 0)
sys.exit(0)
image_name = get_image_name(build_config)
@ -201,7 +211,7 @@ if __name__ == "__main__":
os.makedirs(build_clickhouse_log)
start = time.time()
log_path, success = build_clickhouse(packager_cmd, build_clickhouse_log)
log_path, success = build_clickhouse(packager_cmd, build_clickhouse_log, build_output_path)
elapsed = int(time.time() - start)
subprocess.check_call(f"sudo chown -R ubuntu:ubuntu {build_output_path}", shell=True)
subprocess.check_call(f"sudo chown -R ubuntu:ubuntu {ccache_path}", shell=True)

View File

@ -25,7 +25,7 @@ class BuildResult():
self.with_coverage = with_coverage
def group_by_artifacts(build_urls):
groups = {'deb': [], 'binary': [], 'tgz': [], 'rpm': [], 'preformance': []}
groups = {'deb': [], 'binary': [], 'tgz': [], 'rpm': [], 'performance': []}
for url in build_urls:
if url.endswith('performance.tgz'):
groups['performance'].append(url)

View File

@ -11,7 +11,7 @@ CI_CONFIG = {
"splitted": "unsplitted",
"alien_pkgs": True,
"tidy": "disable",
"with_coverage": False
"with_coverage": False,
},
"performance": {
"compiler": "clang-13",
@ -21,7 +21,7 @@ CI_CONFIG = {
"bundled": "bundled",
"splitted": "unsplitted",
"tidy": "disable",
"with_coverage": False
"with_coverage": False,
},
"binary_gcc": {
"compiler": "gcc-11",
@ -31,7 +31,18 @@ CI_CONFIG = {
"bundled": "bundled",
"splitted": "unsplitted",
"tidy": "disable",
"with_coverage": False
"with_coverage": False,
},
"package_aarch64": {
"compiler": "clang-13-aarch64",
"build_type": "",
"sanitizer": "",
"package_type": "deb",
"bundled": "bundled",
"splitted": "unsplitted",
"alien_pkgs": True,
"tidy": "disable",
"with_coverage": False,
},
"package_asan": {
"compiler": "clang-13",
@ -41,7 +52,7 @@ CI_CONFIG = {
"bundled": "bundled",
"splitted": "unsplitted",
"tidy": "disable",
"with_coverage": False
"with_coverage": False,
},
"package_ubsan": {
"compiler": "clang-13",
@ -51,7 +62,7 @@ CI_CONFIG = {
"bundled": "bundled",
"splitted": "unsplitted",
"tidy": "disable",
"with_coverage": False
"with_coverage": False,
},
"package_tsan": {
"compiler": "clang-13",
@ -61,7 +72,7 @@ CI_CONFIG = {
"bundled": "bundled",
"splitted": "unsplitted",
"tidy": "disable",
"with_coverage": False
"with_coverage": False,
},
"package_msan": {
"compiler": "clang-13",
@ -71,7 +82,7 @@ CI_CONFIG = {
"bundled": "bundled",
"splitted": "unsplitted",
"tidy": "disable",
"with_coverage": False
"with_coverage": False,
},
"package_debug": {
"compiler": "clang-13",
@ -81,7 +92,7 @@ CI_CONFIG = {
"bundled": "bundled",
"splitted": "unsplitted",
"tidy": "disable",
"with_coverage": False
"with_coverage": False,
},
"binary_release": {
"compiler": "clang-13",
@ -91,7 +102,7 @@ CI_CONFIG = {
"bundled": "bundled",
"splitted": "unsplitted",
"tidy": "disable",
"with_coverage": False
"with_coverage": False,
},
"binary_tidy": {
"compiler": "clang-13",
@ -101,7 +112,7 @@ CI_CONFIG = {
"bundled": "bundled",
"splitted": "unsplitted",
"tidy": "enable",
"with_coverage": False
"with_coverage": False,
},
"binary_splitted": {
"compiler": "clang-13",
@ -111,7 +122,7 @@ CI_CONFIG = {
"bundled": "bundled",
"splitted": "splitted",
"tidy": "disable",
"with_coverage": False
"with_coverage": False,
},
"binary_darwin": {
"compiler": "clang-13-darwin",
@ -121,7 +132,7 @@ CI_CONFIG = {
"bundled": "bundled",
"splitted": "unsplitted",
"tidy": "disable",
"with_coverage": False
"with_coverage": False,
},
"binary_aarch64": {
"compiler": "clang-13-aarch64",
@ -131,7 +142,7 @@ CI_CONFIG = {
"bundled": "bundled",
"splitted": "unsplitted",
"tidy": "disable",
"with_coverage": False
"with_coverage": False,
},
"binary_freebsd": {
"compiler": "clang-13-freebsd",
@ -141,7 +152,7 @@ CI_CONFIG = {
"bundled": "bundled",
"splitted": "unsplitted",
"tidy": "disable",
"with_coverage": False
"with_coverage": False,
},
"binary_darwin_aarch64": {
"compiler": "clang-13-darwin-aarch64",
@ -151,7 +162,7 @@ CI_CONFIG = {
"bundled": "bundled",
"splitted": "unsplitted",
"tidy": "disable",
"with_coverage": False
"with_coverage": False,
},
"binary_ppc64le": {
"compiler": "clang-13-ppc64le",
@ -161,27 +172,29 @@ CI_CONFIG = {
"bundled": "bundled",
"splitted": "unsplitted",
"tidy": "disable",
"with_coverage": False
}
"with_coverage": False,
},
},
"builds_report_config": {
"ClickHouse build check (actions)": [
"package_release",
"performance",
"package_aarch64",
"package_asan",
"package_ubsan",
"package_tsan",
"package_msan",
"package_debug",
"binary_release"
"binary_release",
],
"ClickHouse special build check (actions)": [
"binary_tidy",
"binary_splitted",
"binary_darwin",
"binary_arrach64",
"binary_aarch64",
"binary_freebsd",
"binary_darwin_aarch64"
"binary_darwin_aarch64",
"binary_ppc64le",
],
},
"tests_config": {
@ -319,6 +332,6 @@ CI_CONFIG = {
},
"Performance Comparison (actions)": {
"required_build": "performance",
}
}
},
},
}

View File

@ -2,7 +2,6 @@
import logging
import subprocess
import os
import sys
from github import Github
@ -25,13 +24,6 @@ if __name__ == "__main__":
pr_info = PRInfo(get_event(), need_changed_files=True)
gh = Github(get_best_robot_token())
if not pr_info.has_changes_in_documentation():
logging.info ("No changes in documentation")
commit = get_commit(gh, pr_info.sha)
commit.create_status(context=NAME, description="No changes in docs", state="success")
sys.exit(0)
logging.info("Has changes in docs")
if not os.path.exists(temp_path):
os.makedirs(temp_path)

View File

@ -34,6 +34,7 @@ TRUSTED_CONTRIBUTORS = {e.lower() for e in [
"bobrik", # Seasoned contributor, CloundFlare
"BohuTANG",
"codyrobert", # Flickerbox engineer
"cwurm", # Employee
"damozhaeva", # DOCSUP
"den-crane",
"flickerbox-tom", # Flickerbox
@ -65,6 +66,7 @@ TRUSTED_CONTRIBUTORS = {e.lower() for e in [
"vzakaznikov",
"YiuRULE",
"zlobober", # Developer of YT
"ilejn", # Arenadata, responsible for Kerberized Kafka
]}

View File

@ -0,0 +1,13 @@
FROM public.ecr.aws/lambda/python:3.9
# Install the function's dependencies using file requirements.txt
# from your project folder.
COPY requirements.txt .
RUN pip3 install -r requirements.txt --target "${LAMBDA_TASK_ROOT}"
# Copy function code
COPY app.py ${LAMBDA_TASK_ROOT}
# Set the CMD to your handler (could also be done as a parameter override outside of the Dockerfile)
CMD [ "app.handler" ]

View File

@ -0,0 +1,108 @@
#!/usr/bin/env python3
import requests
import argparse
import json
from threading import Thread
from queue import Queue
def get_org_team_members(token: str, org: str, team_slug: str) -> tuple:
headers = {
"Authorization": f"token {token}",
"Accept": "application/vnd.github.v3+json",
}
response = requests.get(
f"https://api.github.com/orgs/{org}/teams/{team_slug}/members", headers=headers
)
response.raise_for_status()
data = response.json()
return tuple(m["login"] for m in data)
def get_members_keys(members: tuple) -> str:
class Worker(Thread):
def __init__(self, request_queue):
Thread.__init__(self)
self.queue = request_queue
self.results = []
def run(self):
while True:
m = self.queue.get()
if m == "":
break
response = requests.get(f"https://github.com/{m}.keys")
self.results.append(f"# {m}\n{response.text}")
self.queue.task_done()
q = Queue()
workers = []
for m in members:
q.put(m)
# Create workers and add to the queue
worker = Worker(q)
worker.start()
workers.append(worker)
# Workers keep working till they receive an empty string
for _ in workers:
q.put("")
# Join workers to wait till they finished
for worker in workers:
worker.join()
responses = []
for worker in workers:
responses.extend(worker.results)
return "".join(responses)
def get_token_from_aws() -> str:
import boto3
secret_name = "clickhouse_robot_token"
session = boto3.session.Session()
client = session.client(
service_name="secretsmanager",
)
get_secret_value_response = client.get_secret_value(SecretId=secret_name)
data = json.loads(get_secret_value_response["SecretString"])
return data["clickhouse_robot_token"]
def main(token: str, org: str, team_slug: str) -> str:
members = get_org_team_members(token, org, team_slug)
keys = get_members_keys(members)
return keys
def handler(event, context):
token = get_token_from_aws()
result = {
"statusCode": 200,
"headers": {
"Content-Type": "text/html",
},
"body": main(token, "ClickHouse", "core"),
}
return result
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Get the public SSH keys for members of given org and team"
)
parser.add_argument("--token", required=True, help="Github PAT")
parser.add_argument(
"--organization", help="GitHub organization name", default="ClickHouse"
)
parser.add_argument("--team", help="GitHub team name", default="core")
args = parser.parse_args()
keys = main(args.token, args.organization, args.team)
print(f"Just shoing off the keys:\n{keys}")

View File

@ -0,0 +1 @@
requests

View File

@ -1,34 +0,0 @@
#!/usr/bin/env bash
set -uo pipefail
echo "Running init script"
export DEBIAN_FRONTEND=noninteractive
export RUNNER_HOME=/home/ubuntu/actions-runner
export RUNNER_URL="https://github.com/ClickHouse"
# Funny fact, but metadata service has fixed IP
export INSTANCE_ID=`curl -s http://169.254.169.254/latest/meta-data/instance-id`
while true; do
runner_pid=`pgrep run.sh`
echo "Got runner pid $runner_pid"
cd $RUNNER_HOME
if [ -z "$runner_pid" ]; then
echo "Receiving token"
RUNNER_TOKEN=`/usr/local/bin/aws ssm get-parameter --name github_runner_registration_token --with-decryption --output text --query Parameter.Value`
echo "Will try to remove runner"
sudo -u ubuntu ./config.sh remove --token $RUNNER_TOKEN ||:
echo "Going to configure runner"
sudo -u ubuntu ./config.sh --url $RUNNER_URL --token $RUNNER_TOKEN --name $INSTANCE_ID --runnergroup Default --labels 'self-hosted,Linux,X64,builder' --work _work
echo "Run"
sudo -u ubuntu ./run.sh &
sleep 15
else
echo "Runner is working with pid $runner_pid, nothing to do"
sleep 10
fi
done

Some files were not shown because too many files have changed in this diff Show More