Merge remote-tracking branch 'blessed/master' into backup_1

This commit is contained in:
Raúl Marín 2023-11-22 10:35:18 +01:00
commit 281060329f
154 changed files with 2074 additions and 514 deletions

View File

@ -21,8 +21,11 @@ include (cmake/clang_tidy.cmake)
include (cmake/git.cmake)
include (cmake/utils.cmake)
# This is needed to set up the CMAKE_INSTALL_BINDIR variable.
include (GNUInstallDirs)
# Ignore export() since we don't use it,
# but it gets broken with a global targets via link_libraries()
# but it gets broken with global targets via link_libraries()
macro (export)
endmacro ()
@ -460,14 +463,6 @@ endif ()
message (STATUS "Building for: ${CMAKE_SYSTEM} ${CMAKE_SYSTEM_PROCESSOR} ${CMAKE_LIBRARY_ARCHITECTURE}")
include (GNUInstallDirs)
# When testing for memory leaks with Valgrind, don't link tcmalloc or jemalloc.
if (TARGET global-group)
install (EXPORT global DESTINATION cmake)
endif ()
add_subdirectory (contrib EXCLUDE_FROM_ALL)
if (NOT ENABLE_JEMALLOC)

View File

@ -35,12 +35,6 @@ if (GLIBC_COMPATIBILITY)
target_link_libraries(global-libs INTERFACE glibc-compatibility ${MEMCPY_LIBRARY})
install(
TARGETS glibc-compatibility ${MEMCPY_LIBRARY}
EXPORT global
ARCHIVE DESTINATION lib
)
message (STATUS "Some symbols from glibc will be replaced for compatibility")
elseif (CLICKHOUSE_OFFICIAL_BUILD)

View File

@ -1,2 +1 @@
add_library(harmful harmful.c)
install(TARGETS harmful EXPORT global ARCHIVE DESTINATION lib)

View File

@ -22,9 +22,3 @@ link_libraries(global-group)
target_link_libraries(global-group INTERFACE
$<TARGET_PROPERTY:global-libs,INTERFACE_LINK_LIBRARIES>
)
# FIXME: remove when all contribs will get custom cmake lists
install(
TARGETS global-group global-libs
EXPORT global
)

View File

@ -25,9 +25,3 @@ link_libraries(global-group)
target_link_libraries(global-group INTERFACE
$<TARGET_PROPERTY:global-libs,INTERFACE_LINK_LIBRARIES>
)
# FIXME: remove when all contribs will get custom cmake lists
install(
TARGETS global-group global-libs
EXPORT global
)

View File

@ -50,9 +50,3 @@ target_link_libraries(global-group INTERFACE
$<TARGET_PROPERTY:global-libs,INTERFACE_LINK_LIBRARIES>
-Wl,--end-group
)
# FIXME: remove when all contribs will get custom cmake lists
install(
TARGETS global-group global-libs
EXPORT global
)

2
contrib/grpc vendored

@ -1 +1 @@
Subproject commit 740e3dfd97301a52ad8165b65285bcc149d9e817
Subproject commit 77b2737a709d43d8c6895e3f03ca62b00bd9201c

View File

@ -61,6 +61,9 @@ set (REQUIRED_LLVM_LIBRARIES
LLVMDemangle
)
# Skip useless "install" instructions from CMake:
set (LLVM_INSTALL_TOOLCHAIN_ONLY 1 CACHE INTERNAL "")
if (ARCH_AMD64)
set (LLVM_TARGETS_TO_BUILD "X86" CACHE INTERNAL "")
list(APPEND REQUIRED_LLVM_LIBRARIES LLVMX86Info LLVMX86Desc LLVMX86CodeGen)

View File

@ -9,4 +9,16 @@ cd $GIT_DIR
contrib/sparse-checkout/setup-sparse-checkout.sh
git submodule init
git submodule sync
git config --file .gitmodules --get-regexp .*path | sed 's/[^ ]* //' | xargs -I _ --max-procs 64 git submodule update --depth=1 --single-branch _
# NOTE: do not use --remote for `git submodule update`[1] command, since the submodule references to the specific commit SHA1 in the subproject.
# It may cause unexpected behavior. Instead you need to commit a new SHA1 for a submodule.
#
# [1] - https://git-scm.com/book/en/v2/Git-Tools-Submodules
git config --file .gitmodules --get-regexp '.*path' | sed 's/[^ ]* //' | xargs -I _ --max-procs 64 git submodule update --depth=1 --single-branch _
# We don't want to depend on any third-party CMake files.
# To check it, find and delete them.
grep -o -P '"contrib/[^"]+"' .gitmodules |
grep -v -P 'contrib/(llvm-project|google-protobuf|grpc|abseil-cpp|corrosion)' |
xargs -I@ find @ \
-'(' -name 'CMakeLists.txt' -or -name '*.cmake' -')' -and -not -name '*.h.cmake' \
-delete

View File

@ -34,15 +34,6 @@ cd /build/build_docker
rm -f CMakeCache.txt
# We don't want to depend on any third-party CMake files.
# To check it, find and delete them.
grep -o -P '"contrib/[^"]+"' ../.gitmodules |
grep -v -P 'llvm-project|google-protobuf|grpc|abseil-cpp|corrosion' |
xargs -I@ find ../@ -'(' -name 'CMakeLists.txt' -or -name '*.cmake' -')' -and -not -name '*.h.cmake' |
xargs rm
if [ -n "$MAKE_DEB" ]; then
rm -rf /build/packages/root
# NOTE: this is for backward compatibility with previous releases,

View File

@ -23,11 +23,6 @@
<max>10G</max>
</max_memory_usage>
<!-- Analyzer is unstable, not ready for testing. -->
<allow_experimental_analyzer>
<readonly/>
</allow_experimental_analyzer>
<table_function_remote_max_addresses>
<max>200</max>
</table_function_remote_max_addresses>

View File

@ -6,9 +6,13 @@ services:
hostname: rabbitmq1
expose:
- ${RABBITMQ_PORT:-5672}
- ${RABBITMQ_SECURE_PORT:-5671}
volumes:
- type: ${RABBITMQ_LOGS_FS:-tmpfs}
source: ${RABBITMQ_LOGS:-}
target: /rabbitmq_logs/
- "${RABBITMQ_COOKIE_FILE}:/var/lib/rabbitmq/.erlang.cookie"
- /misc/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf
- /misc/rabbitmq/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf
- /misc/rabbitmq/ca-cert.pem:/etc/rabbitmq/ca-cert.pem
- /misc/rabbitmq/server-cert.pem:/etc/rabbitmq/server-cert.pem
- /misc/rabbitmq/server-key.pem:/etc/rabbitmq/server-key.pem

View File

@ -1,8 +0,0 @@
loopback_users.guest = false
listeners.tcp.default = 5672
default_pass = clickhouse
default_user = root
management.tcp.port = 15672
log.file = /rabbitmq_logs/rabbit.log
log.file.level = debug

View File

@ -0,0 +1,32 @@
-----BEGIN CERTIFICATE-----
MIIFhTCCA22gAwIBAgIUWhfjFfbwannH3KIqITDtgcvSItMwDQYJKoZIhvcNAQEL
BQAwUjELMAkGA1UEBhMCUlUxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoM
GEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDELMAkGA1UEAwwCY2EwHhcNMjMxMTE0
MTgyODI2WhcNMzMxMTExMTgyODI2WjBSMQswCQYDVQQGEwJSVTETMBEGA1UECAwK
U29tZS1TdGF0ZTEhMB8GA1UECgwYSW50ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMQsw
CQYDVQQDDAJjYTCCAiIwDQYJKoZIhvcNAQEBBQADggIPADCCAgoCggIBAJfJegdC
gavNGYzSdva+5QMxGvqyLwZzjophMeyEzlW/Di4KFGPho+fVlVMB/EwaTRoBRLEu
SQusQwoFg71mGvUTOpgHzlsUz4vcVVFOiL4bJdzCWQKzdC8M8rUFoks9FMboVeSx
jhAnKAm/NpCLpm9VYnRjEq2KEbJp7VkPAHgZEXR7VABwCFvmDcztrfcWfmXxm6IH
o+AkF/nqdphLu7Q1yDQiF8Q8TuszuhqgQ7/1PrRcaSADrF15jJjQb05sILpGCT3e
lxJYId5RF0+fgTIqy03bAKB53+8V8cAkowI4rvPTmcFXhcG3rkDO6lyZixHhlpKi
PmXEzHh0kfsRjzkNBP0CKqPnu3D2iymROiPAH2cteaYe6jdD2HIjuVLk/TjX1ZFy
DlZCrJIwj0l8A2xAfLq8Gw5RSr0a9k5TiMD5nZtfd12Vd0K82vO32vmcjO2Igddc
VWccDDwUY/ZWV3uznkusOBrB8wba3ZsXA5hjJzs0KlTvQKPjX0y4lFMmZGbelwjt
pR5dRNLi5XTdMPzV0mAnvJhDTFEmME19Bh6AEsjuAz3gHUdwNTbSxUS3mF/hTL9k
v2wh5udUAOwqD1uEzqPJyG4JCJQozIDOEEZVixWqQ60b9wUHN8meqO4y9fxTdmHW
Vo5BAF1xEJhJJb0QY/O6GahPtWqb/Mr1rtPJAgMBAAGjUzBRMB0GA1UdDgQWBBSw
fQcOabXwX/v9F1hd2cmuIug56jAfBgNVHSMEGDAWgBSwfQcOabXwX/v9F1hd2cmu
Iug56jAPBgNVHRMBAf8EBTADAQH/MA0GCSqGSIb3DQEBCwUAA4ICAQAms8y6RVxl
mKSUbsU8JscYwOzcRUQJWETeIr4rtZvMHH+3vkdBU0yKxGpEm7U8J3+5oVTYPhbs
11ZAL+DvIZ6gT6pjDvECyVox1OkjNogz843fTMbNqjuuehjSKXwpMTy5/kmT2aLj
//nBi5UX1xo3RQ9vtmBwzZ3VFK99DFXraDOPS/yk43WV2uqdWsXCNvyEyCHmM1IB
9FQe2EFcO6s4/N+TarhIZ8Udhj5bl8d4eDd1yEckmTD4aHJBgMII2uEwrAxR5CT1
tCqUKutvNrkXI5PIULvmy+Lwm7PJAC7grPtUHK6anSugpljd7bFj18fHH9APiC45
Ou4OOK1BUZogCEo7rD36UlanxQO0GEzgDCVEoEdoe0WRdc6T9b4fM8vpQqwBdf9t
nkPB8oLCKerqqYwCiMuWm4BcRmExA7ypIkUCcluGO9/kTmdps3NqOvET9oLTjXuA
z5TPmaK5a3poKLoxBfv6WfRTgisOnMNTsjL1R8+xuhEn5hSlE2r3wAi8Cys9Z9PV
LhTj0SRTXILd2NW3lO8QfO0pGdjgk90GqkyUY9YjuiMVPvdUAFQsHm+0GEZEXjOD
Bw7tLSJQ4IKhfactg/Puxd15ahcWAxeelyED+w/zVGdHYblqbvfdtiGj370KVhoj
DL5HkdPa0IhTPqMBnmoVQ4C/WzKofXBjQQ==
-----END CERTIFICATE-----

View File

@ -0,0 +1,10 @@
#!/bin/bash
# 1. Generate CA's private key and self-signed certificate
openssl req -newkey rsa:4096 -x509 -days 3650 -nodes -batch -keyout ca-key.pem -out ca-cert.pem -subj "/C=RU/ST=Some-State/O=Internet Widgits Pty Ltd/CN=ca"
# 2. Generate server's private key and certificate signing request (CSR)
openssl req -newkey rsa:4096 -nodes -batch -keyout server-key.pem -out server-req.pem -subj "/C=RU/ST=Some-State/O=Internet Widgits Pty Ltd/CN=server"
# 3. Use CA's private key to sign server's CSR and get back the signed certificate
openssl x509 -req -days 3650 -in server-req.pem -CA ca-cert.pem -CAkey ca-key.pem -CAcreateserial -extfile server-ext.cnf -out server-cert.pem

View File

@ -0,0 +1,15 @@
loopback_users.guest = false
listeners.tcp.default = 5672
default_pass = clickhouse
default_user = root
management.tcp.port = 15672
log.file = /rabbitmq_logs/rabbit.log
log.file.level = debug
listeners.ssl.default = 5671
ssl_options.verify = verify_none
ssl_options.fail_if_no_peer_cert = false
ssl_options.cacertfile = /etc/rabbitmq/ca-cert.pem
ssl_options.certfile = /etc/rabbitmq/server-cert.pem
ssl_options.keyfile = /etc/rabbitmq/server-key.pem

View File

@ -0,0 +1,33 @@
-----BEGIN CERTIFICATE-----
MIIFpTCCA42gAwIBAgIUJvQslezZO09XgFGQCxOM6orIsWowDQYJKoZIhvcNAQEL
BQAwUjELMAkGA1UEBhMCUlUxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoM
GEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDELMAkGA1UEAwwCY2EwHhcNMjMxMTE0
MTgyODI5WhcNMzMxMTExMTgyODI5WjBWMQswCQYDVQQGEwJSVTETMBEGA1UECAwK
U29tZS1TdGF0ZTEhMB8GA1UECgwYSW50ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMQ8w
DQYDVQQDDAZzZXJ2ZXIwggIiMA0GCSqGSIb3DQEBAQUAA4ICDwAwggIKAoICAQCe
o/K71WdKpVpdDvhaZy6wBVhFlu7j7DhfTSYvcPpAJfExmzO8JK3vh5/yGyAO1t79
gAjqyXLMCZKw7ajM2rez9YnGYqaFi70BlTcU2KQ8LbFEYRc3cYNDmmWIKBpwpSri
We5SQrRLnDXqAn6T8FG5ejQ/t+1IUMrtZENB4lp8fBmEOJb5yr1TE++6EhiDBQho
cLDWWWP8b55kyZhqP/VgmId4lvboGMRKxbiRJ6/SPr/i/pteBD8jTYfbJr6ceXov
/p5yxIp61z5ry1anU7W3B8jTl/gj7SqtFdSnRajZ0DGJJAUKpiiJSCSlp5YB5Ub2
eBBMHmdA5R1MuiU9TOA35nUW5wkhEOJXnBR/WCsYioVmn/+5dm6JPYiwp/TefYnr
x9iLbb/Tyx7MnXzeyvKg781SwmnvS6Blhtr0zhAW9szZz8cVHPBqFs6PzGs/5mwE
C+tM3Zp85aHd28nIT4NQLHdMDwVmGwmPdy4uavtYWMDhsuIyEU8hCZymiHhPnuHU
VbmfZ8GOTIzUgQAvZb0fL1Xow2Tf6XuARnvuU9weRttg9jSOqPuUENRsFXv0mU8M
EpQjrxry88Wfz7bBEjN5JHC16PB/Nu7zTGJ4/slThbxNv0bIONzvTBPbXrKnxw7Z
d9WhGJI+LQxRqLTynQe6yzDwIuW9LRdBNTp7CtQRwQIDAQABo28wbTArBgNVHREE
JDAigiBpbnRlZ3JhdGlvbi10ZXN0cy5jbGlja2hvdXNlLmNvbTAdBgNVHQ4EFgQU
54GvBUYWvMADpTz/zglwMlaJuskwHwYDVR0jBBgwFoAUsH0HDmm18F/7/RdYXdnJ
riLoOeowDQYJKoZIhvcNAQELBQADggIBADfNH6O6ay+xg0XmV6sR0n4j6PwL9Cnc
VjuCmHQbpFXfMvgCdfHvbtT0Y/pG7IoeKmrrm0JPvKa2E9Ht0j6ZnowQ2m9mJk8U
5Fd/PbC1I4KgVCw6HRSOcwqANJxOGe7RyN9PTZZ8fxzmzIR3FiQ2bXfr+LaotZOK
aVS8F8xCOzoMvL9LFls2YpEn20p/1EATIf2MFX3j9vKfcJVOyDJV4i5BMImStFLM
g3sdC96de/59yxt9khM0PNucU1ldNFs/kZVEcNSwGOAIgQEPwULJtDY+ZSWeROpX
EpWndN6zQsv1pdNvLtXsDXfi4YoH9QVaA/k4aFFJ08CjSZfMYmwyPOGsf/wqT65i
ADID2yb1A/FIIe/fM+d2gXHBVFBDmydJ1JCdCoYrEJgfWj1LO/0jLi34ZZ17Hu7F
D33fLARF9nlLzlUiWjcQlOjNoCM48AgG/3wHk4eiSfc/3PIJDuDGDa0NdtDeKKhH
XkP2ll4cMUH6EQ9KO1jHPmf5RokX4QJgH+ofO4U5XQFwc3lOyJzEQnED+wame7do
R7TE4F/OXhxLqA6DFkzXe89/kSCoAF9bjzmUn/ilrg8NXKKgprgHg4DJHgvCQVVC
34ab7Xj7msUm4D9vI+GAeUbUqnqCaWxDF6vCMT0Qq7iSVDxa/SV8TX8Vp2Zh+PSh
4m23Did+KjLq
-----END CERTIFICATE-----

View File

@ -0,0 +1 @@
subjectAltName=DNS:integration-tests.clickhouse.com

View File

@ -0,0 +1,52 @@
-----BEGIN PRIVATE KEY-----
MIIJQgIBADANBgkqhkiG9w0BAQEFAASCCSwwggkoAgEAAoICAQCeo/K71WdKpVpd
DvhaZy6wBVhFlu7j7DhfTSYvcPpAJfExmzO8JK3vh5/yGyAO1t79gAjqyXLMCZKw
7ajM2rez9YnGYqaFi70BlTcU2KQ8LbFEYRc3cYNDmmWIKBpwpSriWe5SQrRLnDXq
An6T8FG5ejQ/t+1IUMrtZENB4lp8fBmEOJb5yr1TE++6EhiDBQhocLDWWWP8b55k
yZhqP/VgmId4lvboGMRKxbiRJ6/SPr/i/pteBD8jTYfbJr6ceXov/p5yxIp61z5r
y1anU7W3B8jTl/gj7SqtFdSnRajZ0DGJJAUKpiiJSCSlp5YB5Ub2eBBMHmdA5R1M
uiU9TOA35nUW5wkhEOJXnBR/WCsYioVmn/+5dm6JPYiwp/TefYnrx9iLbb/Tyx7M
nXzeyvKg781SwmnvS6Blhtr0zhAW9szZz8cVHPBqFs6PzGs/5mwEC+tM3Zp85aHd
28nIT4NQLHdMDwVmGwmPdy4uavtYWMDhsuIyEU8hCZymiHhPnuHUVbmfZ8GOTIzU
gQAvZb0fL1Xow2Tf6XuARnvuU9weRttg9jSOqPuUENRsFXv0mU8MEpQjrxry88Wf
z7bBEjN5JHC16PB/Nu7zTGJ4/slThbxNv0bIONzvTBPbXrKnxw7Zd9WhGJI+LQxR
qLTynQe6yzDwIuW9LRdBNTp7CtQRwQIDAQABAoICAA0lev0T3z5xW36wueYL/PN7
TehebKeYsMc9BngR/bsJKea5fN0PkRZzf865brusFMifLp3+WbQM6wocd8uaKHUS
WPuGu1P/04bpDap9lYajJriK7ziaAI2+osFYyXAiT954I2bPvk8xv8oHsOOjm7Iq
LWBGZrSCdX6cu3IfRu5f/mFVqzVCFtRmp4wc6ckZxquZAx6QQ9fsjAzAJBBSAoyh
t0BICmgLfWDQ582no0tiBdbS0J9G7NCJIUQI/uzKqFSH3iuWm/84DSUzsZemOT3U
uFDInDil885qK7g87pQ2S5SY1o4eXOebgeX0cFrx3CKaqocUUewv0HDGUEW3NDFs
KhUvlJZIFgk6bMend16U6kfRCUsjLA22Rfxzanl53cGVywCeIMirnLYuEu0TsxyK
CblBvyhcpjrGi7FQskzR+J9LpZPnmtn6TAb7JCAALRVHcAGKhGeh613SjPUfkWb0
KpDps08x8MWGEAALuHbOK0nMLFm+PuMt7+krqCeJET+XM44GT+6ZstrDv0RufxUN
+pkLW7AsVZoXcFvaOWjuyBvX/f6UHCSfueo0mB3H80WoftDIfdhM+AI7/oBTYCBx
Z8BtW+g7Eq3pOUg/Um7S7Z2bybBWE14kpi95gRf3upEYPqHJUpJPdu20lk24iAt9
LCXF4AjZBIdAuyJrYOJBAoIBAQDd/Bm14WvmBOablGLn6hmohi6M75D+/eQanlg9
eJhXJUVd8FzOTjKi70EHWvkqswenNDbe/WGtImqG+9G+N/ol2qhi5xVSQ2XQmcVQ
U+k15Bzm9xKM0OqsStFvRgP1Cy6Ms3/jxr5JEEwUepmjvWTDGTlhTQASA/D7Uh2q
5HpPiHEVm4g5eTAYWeAbI6cGwVS0L4y6xkFGde37Kh2P8ZodWB+d3fglVu4Ok9Nf
wE2f8MK2ewQ0SbF/Nj2WjlVomvOvOJG/2CDLuiH/vc4YUvLAm8pNwvsmgtSh1Okt
E/HfXegrlPPEgw6owqoQFt+aGUITgEhiwEVAcYS0pXzzkQX5AoIBAQC28wJ8ueKr
fINpJM2pSc7WRDFduP5yGsRreSLBXLKMbvOlIVb3PaWp11Cg3+X5O90bPXYJ9mBI
WGR0g14/VD8edxs2D5TUZcP4/vKXGHaWRY9Z4A3jVpjzAxAaviNDHJ08tLXEMXZQ
lbA7dX8z6lpoQfwnPzjBwB01mVegwXPeIwIIfT/FmAiGzvSnAMXBGSGWRRdzof0M
/vPFbgllcQmM4AnEGcErCgFRpwcssO87T2jnvf6QVE5JCcnUcGIli1ThxCU9TRZM
5s6R7Nvk3/UjwcpRcqMtnGpTT2QXSnRwvWUfM+bKTwaxz4PjqKpgIc11kwJAjlxk
4CxYf1mDGLwJAoIBAGFJRTNS8ejDKRXyOE6PaGNVOz2FGLTILJoF34JBQfKfYQFE
gEfiOYry9Dr3AdBW2fnLhmi//3jTZoB2CHwnKDhC1h1STSPaadq8KZ+ExuZZbNlE
WxrfzJlpyNPNiZpxJht/54K57Vc0D0PCX2dFb82ZVm5wQqGinJBocpwcugX1NCpW
GaOmmw9xBCigvWjWffriA/kvPhhVQtEaqg4Vwoctwd18FG645Gf7HV4Pd3WrHIrA
6xzHV0T7To6XHpNTpYybbDT50ZW3o4LjellqsPz8yfK+izdbizjJiM+6t/w+uauw
Ag2Tqm8HsWSPwbtVaoIFbLPqs+8EUTaieFp+qnECggEAVuaTdd9uFfrtCNKchh8z
CoAV2uj2pAim6E3//k0j2qURQozVnFdCC6zk9aWkvYB8BGZrXUwUbAjgnp+P8xD3
cmctG77G+STls66WWMMcAUFFWHGe5y/JMxVvXuSWJ1i+L4m/FVRRWPHhZjznkSdu
jjtZpOLY+N9igIU4JHn/qbKDUrj7w8X1tuMzPuiVBqYDWDe1bg2x/6xS6qLb/71z
xeDdgrKhGOqFud1XARmCaW/M6tdKxg/lp7fokOpZFHBcf2kGL1ogj6LK2HHj+ZGQ
Bc4VZh7H9/BmaPA7IP0S1kKAeBPVOp/TFD737Pm/BC7KQ2DzHusAZEI/jkHfqO/k
0QKCAQEAuiYLn9iLgk4uQO9oaSBGWKrJsR2L2dqI7IWU0X9xJlsQrJKcEeWg4LXt
djLsz0HrxZV/c+Pnh79hmFlBoEmH+hz32D/xd+/qrwwAcMkHAwMbznJu0IIuW2O9
Uzma++7SvVmr9H0DkUwXFP3jn1A2n3uuI4czqtQ8N7GiH0UAWR5CsIP7azHvZTSj
s4Fzf8rTE6pNqVgQXjrVbI9H/h0uPP4alJbhnPba9mgB1cGmfBEnPkKgYNqSZse+
95G2TlcK74sKBUSdBKqYBZ4ZUeTXV974Nva9guE9vzDQt1Cj6k0HWISVPUshPzIh
qrdHdxcM6yhA0Z0Gu6zj+Zsy4lU8gA==
-----END PRIVATE KEY-----

View File

@ -140,21 +140,6 @@ EOL
-->
<core_path>$PWD</core_path>
</clickhouse>
EOL
# Analyzer is not yet ready for testing
cat > /etc/clickhouse-server/users.d/no_analyzer.xml <<EOL
<clickhouse>
<profiles>
<default>
<constraints>
<allow_experimental_analyzer>
<readonly/>
</allow_experimental_analyzer>
</constraints>
</default>
</profiles>
</clickhouse>
EOL
}

View File

@ -78,6 +78,7 @@ remove_keeper_config "create_if_not_exists" "[01]"
rm /etc/clickhouse-server/config.d/merge_tree.xml
rm /etc/clickhouse-server/config.d/enable_wait_for_shutdown_replicated_tables.xml
rm /etc/clickhouse-server/users.d/nonconst_timezone.xml
rm /etc/clickhouse-server/users.d/s3_cache_new.xml
start
stop
@ -114,6 +115,7 @@ sudo chgrp clickhouse /etc/clickhouse-server/config.d/s3_storage_policy_by_defau
rm /etc/clickhouse-server/config.d/merge_tree.xml
rm /etc/clickhouse-server/config.d/enable_wait_for_shutdown_replicated_tables.xml
rm /etc/clickhouse-server/users.d/nonconst_timezone.xml
rm /etc/clickhouse-server/users.d/s3_cache_new.xml
start

View File

@ -8,7 +8,7 @@ sidebar_position: 60
Creates a ClickHouse database with tables from PostgreSQL database. Firstly, database with engine `MaterializedPostgreSQL` creates a snapshot of PostgreSQL database and loads required tables. Required tables can include any subset of tables from any subset of schemas from specified database. Along with the snapshot database engine acquires LSN and once initial dump of tables is performed - it starts pulling updates from WAL. After database is created, newly added tables to PostgreSQL database are not automatically added to replication. They have to be added manually with `ATTACH TABLE db.table` query.
Replication is implemented with PostgreSQL Logical Replication Protocol, which does not allow to replicate DDL, but allows to know whether replication breaking changes happened (column type changes, adding/removing columns). Such changes are detected and according tables stop receiving updates. In this case you should use `ATTACH`/ `DETACH` queries to reload table completely. If DDL does not break replication (for example, renaming a column) table will still receive updates (insertion is done by position).
Replication is implemented with PostgreSQL Logical Replication Protocol, which does not allow to replicate DDL, but allows to know whether replication breaking changes happened (column type changes, adding/removing columns). Such changes are detected and according tables stop receiving updates. In this case you should use `ATTACH`/ `DETACH PERMANENTLY` queries to reload table completely. If DDL does not break replication (for example, renaming a column) table will still receive updates (insertion is done by position).
:::note
This database engine is experimental. To use it, set `allow_experimental_database_materialized_postgresql` to 1 in your configuration files or by using the `SET` command:
@ -63,7 +63,7 @@ Before version 22.1, adding a table to replication left a non-removed temporary
It is possible to remove specific tables from replication:
``` sql
DETACH TABLE postgres_database.table_to_remove;
DETACH TABLE postgres_database.table_to_remove PERMANENTLY;
```
## PostgreSQL schema {#schema}

View File

@ -2740,7 +2740,7 @@ ClickHouse will use it to form the proxy URI using the following template: `{pro
<proxy_cache_time>10</proxy_cache_time>
</resolver>
</http>
<https>
<resolver>
<endpoint>http://resolver:8080/hostname</endpoint>

View File

@ -0,0 +1,59 @@
---
slug: /en/operations/system-tables/blob_storage_log
---
# Blob Storage Operations Log
Contains logging entries with information about various blob storage operations such as uploads and deletes.
Columns:
- `event_date` ([Date](../../sql-reference/data-types/date.md)) — Date of the event.
- `event_time` ([DateTime](../../sql-reference/data-types/datetime.md)) — Time of the event.
- `event_time_microseconds` ([DateTime64](../../sql-reference/data-types/datetime64.md)) — Time of the event with microseconds precision.
- `event_type` ([Enum8](../../sql-reference/data-types/enum.md)) — Type of the event. Possible values:
- `'Upload'`
- `'Delete'`
- `'MultiPartUploadCreate'`
- `'MultiPartUploadWrite'`
- `'MultiPartUploadComplete'`
- `'MultiPartUploadAbort'`
- `query_id` ([String](../../sql-reference/data-types/string.md)) — Identifier of the query associated with the event, if any.
- `thread_id` ([UInt64](../../sql-reference/data-types/int-uint.md#uint-ranges)) — Identifier of the thread performing the operation.
- `thread_name` ([String](../../sql-reference/data-types/string.md)) — Name of the thread performing the operation.
- `disk_name` ([LowCardinality(String)](../../sql-reference/data-types/lowcardinality.md)) — Name of the associated disk.
- `bucket` ([String](../../sql-reference/data-types/string.md)) — Name of the bucket.
- `remote_path` ([String](../../sql-reference/data-types/string.md)) — Path to the remote resource.
- `local_path` ([String](../../sql-reference/data-types/string.md)) — Path to the metadata file on the local system, which references the remote resource.
- `data_size` ([UInt32](../../sql-reference/data-types/int-uint.md#uint-ranges)) — Size of the data involved in the upload event.
- `error` ([String](../../sql-reference/data-types/string.md)) — Error message associated with the event, if any.
**Example**
Suppose a blob storage operation uploads a file, and an event is logged:
```sql
SELECT * FROM system.blob_storage_log WHERE query_id = '7afe0450-504d-4e4b-9a80-cd9826047972' ORDER BY event_date, event_time_microseconds \G
```
```text
Row 1:
──────
event_date: 2023-10-31
event_time: 2023-10-31 16:03:40
event_time_microseconds: 2023-10-31 16:03:40.481437
event_type: Upload
query_id: 7afe0450-504d-4e4b-9a80-cd9826047972
thread_id: 2381740
disk_name: disk_s3
bucket: bucket1
remote_path: rrr/kxo/tbnqtrghgtnxkzgtcrlutwuslgawe
local_path: store/654/6549e8b3-d753-4447-8047-d462df6e6dbe/tmp_insert_all_1_1_0/checksums.txt
data_size: 259
error:
```
In this example, upload operation was associated with the `INSERT` query with ID `7afe0450-504d-4e4b-9a80-cd9826047972`. The local metadata file `store/654/6549e8b3-d753-4447-8047-d462df6e6dbe/tmp_insert_all_1_1_0/checksums.txt` refers to remote path `rrr/kxo/tbnqtrghgtnxkzgtcrlutwuslgawe` in bucket `bucket1` on disk `disk_s3`, with a size of 259 bytes.
**See Also**
- [External Disks for Storing Data](../../operations/storing-data.md)

View File

@ -439,7 +439,7 @@ concat(s1, s2, ...)
**Arguments**
At least two values of arbitrary type.
At least one value of arbitrary type.
Arguments which are not of types [String](../../sql-reference/data-types/string.md) or [FixedString](../../sql-reference/data-types/fixedstring.md) are converted to strings using their default serialization. As this decreases performance, it is not recommended to use non-String/FixedString arguments.

View File

@ -52,8 +52,6 @@ contents:
dst: /lib/systemd/system/clickhouse-server.service
- src: root/usr/bin/clickhouse-copier
dst: /usr/bin/clickhouse-copier
- src: root/usr/bin/clickhouse-report
dst: /usr/bin/clickhouse-report
- src: root/usr/bin/clickhouse-server
dst: /usr/bin/clickhouse-server
# clickhouse-keeper part

View File

@ -63,8 +63,6 @@ option (ENABLE_CLICKHOUSE_SU "A tool similar to 'su'" ${ENABLE_CLICKHOUSE_ALL})
option (ENABLE_CLICKHOUSE_DISKS "A tool to manage disks" ${ENABLE_CLICKHOUSE_ALL})
option (ENABLE_CLICKHOUSE_REPORT "A tiny tool to collect a clickhouse-server state" ${ENABLE_CLICKHOUSE_ALL})
if (NOT ENABLE_NURAFT)
# RECONFIGURE_MESSAGE_LEVEL should not be used here,
# since ENABLE_NURAFT is set to OFF for FreeBSD and Darwin.
@ -390,9 +388,6 @@ if (ENABLE_CLICKHOUSE_SU)
install (FILES "${CMAKE_CURRENT_BINARY_DIR}/clickhouse-su" DESTINATION ${CMAKE_INSTALL_BINDIR} COMPONENT clickhouse)
list(APPEND CLICKHOUSE_BUNDLE clickhouse-su)
endif ()
if (ENABLE_CLICKHOUSE_REPORT)
include(${ClickHouse_SOURCE_DIR}/utils/report/CMakeLists.txt)
endif ()
if (ENABLE_CLICKHOUSE_KEEPER)
if (NOT BUILD_STANDALONE_KEEPER AND CREATE_KEEPER_SYMLINK)

View File

@ -1238,7 +1238,6 @@ void Client::processConfig()
global_context->setCurrentQueryId(query_id);
}
print_stack_trace = config().getBool("stacktrace", false);
logging_initialized = true;
if (config().has("multiquery"))
is_multiquery = true;

View File

@ -563,9 +563,6 @@ catch (...)
void LocalServer::updateLoggerLevel(const String & logs_level)
{
if (!logging_initialized)
return;
config().setString("logger.level", logs_level);
updateLevels(config(), logger());
}
@ -607,21 +604,13 @@ void LocalServer::processConfig()
Poco::AutoPtr<OwnPatternFormatter> pf = new OwnPatternFormatter;
Poco::AutoPtr<OwnFormattingChannel> log = new OwnFormattingChannel(pf, new Poco::SimpleFileChannel(server_logs_file));
Poco::Logger::root().setChannel(log);
logging_initialized = true;
}
else if (logging || is_interactive)
{
config().setString("logger", "logger");
auto log_level_default = is_interactive && !logging ? "none" : level;
config().setString("logger.level", config().getString("log-level", config().getString("send_logs_level", log_level_default)));
buildLoggers(config(), logger(), "clickhouse-local");
logging_initialized = true;
}
else
{
Poco::Logger::root().setLevel("none");
Poco::Logger::root().setChannel(Poco::AutoPtr<Poco::NullChannel>(new Poco::NullChannel()));
logging_initialized = false;
config().setString("logger", "logger");
auto log_level_default = logging ? level : "fatal";
config().setString("logger.level", config().getString("log-level", config().getString("send_logs_level", log_level_default)));
buildLoggers(config(), logger(), "clickhouse-local");
}
shared_context = Context::createShared();

View File

@ -1248,6 +1248,25 @@
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
</backup_log>
<!-- Storage S3Queue log.
-->
<s3queue_log>
<database>system</database>
<table>s3queue_log</table>
<partition_by>toYYYYMM(event_date)</partition_by>
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
</s3queue_log>
<!-- Blob storage object operations log.
-->
<blob_storage_log>
<database>system</database>
<table>blob_storage_log</table>
<partition_by>toYYYYMM(event_date)</partition_by>
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
<ttl>event_date + INTERVAL 30 DAY</ttl>
</blob_storage_log>
<!-- <top_level_domains_path>/var/lib/clickhouse/top_level_domains/</top_level_domains_path> -->
<!-- Custom TLD lists.
Format: <name>/path/to/file</name>

View File

@ -5,11 +5,214 @@
#include <Analyzer/InDepthQueryTreeVisitor.h>
#include <Analyzer/FunctionNode.h>
#include <Analyzer/ConstantNode.h>
#include <Analyzer/JoinNode.h>
#include <Analyzer/HashUtils.h>
#include <Analyzer/Utils.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
/// Visitor that optimizes logical expressions _only_ in JOIN ON section
class JoinOnLogicalExpressionOptimizerVisitor : public InDepthQueryTreeVisitorWithContext<JoinOnLogicalExpressionOptimizerVisitor>
{
public:
using Base = InDepthQueryTreeVisitorWithContext<JoinOnLogicalExpressionOptimizerVisitor>;
explicit JoinOnLogicalExpressionOptimizerVisitor(ContextPtr context)
: Base(std::move(context))
{}
void enterImpl(QueryTreeNodePtr & node)
{
auto * function_node = node->as<FunctionNode>();
if (!function_node)
return;
if (function_node->getFunctionName() == "or")
{
bool is_argument_type_changed = tryOptimizeIsNotDistinctOrIsNull(node, getContext());
if (is_argument_type_changed)
need_rerun_resolve = true;
return;
}
}
void leaveImpl(QueryTreeNodePtr & node)
{
if (!need_rerun_resolve)
return;
if (auto * function_node = node->as<FunctionNode>())
rerunFunctionResolve(function_node, getContext());
}
private:
bool need_rerun_resolve = false;
/// Returns true if type of some operand is changed and parent function needs to be re-resolved
static bool tryOptimizeIsNotDistinctOrIsNull(QueryTreeNodePtr & node, const ContextPtr & context)
{
auto & function_node = node->as<FunctionNode &>();
chassert(function_node.getFunctionName() == "or");
QueryTreeNodes or_operands;
or_operands.reserve(function_node.getArguments().getNodes().size());
/// Indices of `equals` or `isNotDistinctFrom` functions in the vector above
std::vector<size_t> equals_functions_indices;
/** Map from `isNull` argument to indices of operands that contains that `isNull` functions
* `a = b OR (a IS NULL AND b IS NULL) OR (a IS NULL AND c IS NULL)`
* will be mapped to
* {
* a => [(a IS NULL AND b IS NULL), (a IS NULL AND c IS NULL)]
* b => [(a IS NULL AND b IS NULL)]
* c => [(a IS NULL AND c IS NULL)]
* }
* Then for each a <=> b we can find all operands that contains both a IS NULL and b IS NULL
*/
QueryTreeNodePtrWithHashMap<std::vector<size_t>> is_null_argument_to_indices;
for (const auto & argument : function_node.getArguments())
{
or_operands.push_back(argument);
auto * argument_function = argument->as<FunctionNode>();
if (!argument_function)
continue;
const auto & func_name = argument_function->getFunctionName();
if (func_name == "equals" || func_name == "isNotDistinctFrom")
{
equals_functions_indices.push_back(or_operands.size() - 1);
}
else if (func_name == "and")
{
for (const auto & and_argument : argument_function->getArguments().getNodes())
{
auto * and_argument_function = and_argument->as<FunctionNode>();
if (and_argument_function && and_argument_function->getFunctionName() == "isNull")
{
const auto & is_null_argument = and_argument_function->getArguments().getNodes()[0];
is_null_argument_to_indices[is_null_argument].push_back(or_operands.size() - 1);
}
}
}
}
/// OR operands that are changed to and needs to be re-resolved
std::unordered_set<size_t> arguments_to_reresolve;
for (size_t equals_function_idx : equals_functions_indices)
{
auto * equals_function = or_operands[equals_function_idx]->as<FunctionNode>();
/// For a <=> b we are looking for expressions containing both `a IS NULL` and `b IS NULL` combined with AND
const auto & argument_nodes = equals_function->getArguments().getNodes();
const auto & lhs_is_null_parents = is_null_argument_to_indices[argument_nodes[0]];
const auto & rhs_is_null_parents = is_null_argument_to_indices[argument_nodes[1]];
std::unordered_set<size_t> operands_to_optimize;
std::set_intersection(lhs_is_null_parents.begin(), lhs_is_null_parents.end(),
rhs_is_null_parents.begin(), rhs_is_null_parents.end(),
std::inserter(operands_to_optimize, operands_to_optimize.begin()));
/// If we have `a = b OR (a IS NULL AND b IS NULL)` we can optimize it to `a <=> b`
if (!operands_to_optimize.empty() && equals_function->getFunctionName() == "equals")
arguments_to_reresolve.insert(equals_function_idx);
for (size_t to_optimize_idx : operands_to_optimize)
{
/// We are looking for operand `a IS NULL AND b IS NULL AND ...`
auto * operand_to_optimize = or_operands[to_optimize_idx]->as<FunctionNode>();
/// Remove `a IS NULL` and `b IS NULL` arguments from AND
QueryTreeNodes new_arguments;
for (const auto & and_argument : operand_to_optimize->getArguments().getNodes())
{
bool to_eliminate = false;
const auto * and_argument_function = and_argument->as<FunctionNode>();
if (and_argument_function && and_argument_function->getFunctionName() == "isNull")
{
const auto & is_null_argument = and_argument_function->getArguments().getNodes()[0];
to_eliminate = (is_null_argument->isEqual(*argument_nodes[0]) || is_null_argument->isEqual(*argument_nodes[1]));
}
if (to_eliminate)
arguments_to_reresolve.insert(to_optimize_idx);
else
new_arguments.emplace_back(and_argument);
}
/// If less than two arguments left, we will remove or replace the whole AND below
operand_to_optimize->getArguments().getNodes() = std::move(new_arguments);
}
}
if (arguments_to_reresolve.empty())
/// Nothing have been changed
return false;
auto and_function_resolver = FunctionFactory::instance().get("and", context);
auto strict_equals_function_resolver = FunctionFactory::instance().get("isNotDistinctFrom", context);
bool need_reresolve = false;
QueryTreeNodes new_or_operands;
for (size_t i = 0; i < or_operands.size(); ++i)
{
if (arguments_to_reresolve.contains(i))
{
auto * function = or_operands[i]->as<FunctionNode>();
if (function->getFunctionName() == "equals")
{
/// We should replace `a = b` with `a <=> b` because we removed checks for IS NULL
need_reresolve |= function->getResultType()->isNullable();
function->resolveAsFunction(strict_equals_function_resolver);
new_or_operands.emplace_back(std::move(or_operands[i]));
}
else if (function->getFunctionName() == "and")
{
const auto & and_arguments = function->getArguments().getNodes();
if (and_arguments.size() > 1)
{
function->resolveAsFunction(and_function_resolver);
new_or_operands.emplace_back(std::move(or_operands[i]));
}
else if (and_arguments.size() == 1)
{
/// Replace AND with a single argument with the argument itself
new_or_operands.emplace_back(and_arguments[0]);
}
}
else
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected function name: '{}'", function->getFunctionName());
}
else
{
new_or_operands.emplace_back(std::move(or_operands[i]));
}
}
if (new_or_operands.size() == 1)
{
node = std::move(new_or_operands[0]);
return need_reresolve;
}
/// Rebuild OR function
auto or_function_resolver = FunctionFactory::instance().get("or", context);
function_node.getArguments().getNodes() = std::move(new_or_operands);
function_node.resolveAsFunction(or_function_resolver);
return need_reresolve;
}
};
class LogicalExpressionOptimizerVisitor : public InDepthQueryTreeVisitorWithContext<LogicalExpressionOptimizerVisitor>
{
public:
@ -21,6 +224,17 @@ public:
void enterImpl(QueryTreeNodePtr & node)
{
if (auto * join_node = node->as<JoinNode>())
{
/// Operator <=> is not supported outside of JOIN ON section
if (join_node->hasJoinExpression())
{
JoinOnLogicalExpressionOptimizerVisitor join_on_visitor(getContext());
join_on_visitor.visit(join_node->getJoinExpression());
}
return;
}
auto * function_node = node->as<FunctionNode>();
if (!function_node)
@ -38,6 +252,7 @@ public:
return;
}
}
private:
void tryReplaceAndEqualsChainsWithConstant(QueryTreeNodePtr & node)
{

View File

@ -67,6 +67,17 @@ namespace DB
* FROM TABLE
* WHERE a = 1 AND b = 'test';
* -------------------------------
*
* 5. Remove unnecessary IS NULL checks in JOIN ON clause
* - equality check with explicit IS NULL check replaced with <=> operator
* -------------------------------
* SELECT * FROM t1 JOIN t2 ON a = b OR (a IS NULL AND b IS NULL)
* SELECT * FROM t1 JOIN t2 ON a <=> b OR (a IS NULL AND b IS NULL)
*
* will be transformed into
*
* SELECT * FROM t1 JOIN t2 ON a <=> b
* -------------------------------
*/
class LogicalExpressionOptimizerPass final : public IQueryTreePass

View File

@ -127,6 +127,9 @@ BackupReaderS3::BackupReaderS3(
request_settings.max_single_read_retries = context_->getSettingsRef().s3_max_single_read_retries; // FIXME: Avoid taking value for endpoint
request_settings.allow_native_copy = allow_s3_native_copy;
client = makeS3Client(s3_uri_, access_key_id_, secret_access_key_, s3_settings, context_);
if (auto blob_storage_system_log = context_->getBlobStorageLog())
blob_storage_log = std::make_shared<BlobStorageLogWriter>(blob_storage_system_log);
}
BackupReaderS3::~BackupReaderS3() = default;
@ -178,6 +181,7 @@ void BackupReaderS3::copyFileToDisk(const String & path_in_backup, size_t file_s
/* dest_key= */ blob_path[0],
s3_settings.request_settings,
read_settings,
blob_storage_log,
object_attributes,
threadPoolCallbackRunner<void>(getBackupsIOThreadPool().get(), "BackupReaderS3"),
/* for_disk_s3= */ true);
@ -214,6 +218,12 @@ BackupWriterS3::BackupWriterS3(
request_settings.allow_native_copy = allow_s3_native_copy;
request_settings.setStorageClassName(storage_class_name);
client = makeS3Client(s3_uri_, access_key_id_, secret_access_key_, s3_settings, context_);
if (auto blob_storage_system_log = context_->getBlobStorageLog())
{
blob_storage_log = std::make_shared<BlobStorageLogWriter>(blob_storage_system_log);
if (context_->hasQueryContext())
blob_storage_log->query_id = context_->getQueryContext()->getCurrentQueryId();
}
}
void BackupWriterS3::copyFileFromDisk(const String & path_in_backup, DiskPtr src_disk, const String & src_path,
@ -239,6 +249,7 @@ void BackupWriterS3::copyFileFromDisk(const String & path_in_backup, DiskPtr src
fs::path(s3_uri.key) / path_in_backup,
s3_settings.request_settings,
read_settings,
blob_storage_log,
{},
threadPoolCallbackRunner<void>(getBackupsIOThreadPool().get(), "BackupWriterS3"));
return; /// copied!
@ -262,13 +273,15 @@ void BackupWriterS3::copyFile(const String & destination, const String & source,
fs::path(s3_uri.key) / destination,
s3_settings.request_settings,
read_settings,
blob_storage_log,
{},
threadPoolCallbackRunner<void>(getBackupsIOThreadPool().get(), "BackupWriterS3"));
}
void BackupWriterS3::copyDataToFile(const String & path_in_backup, const CreateReadBufferFunction & create_read_buffer, UInt64 start_pos, UInt64 length)
{
copyDataToS3File(create_read_buffer, start_pos, length, client, s3_uri.bucket, fs::path(s3_uri.key) / path_in_backup, s3_settings.request_settings, {},
copyDataToS3File(create_read_buffer, start_pos, length, client, s3_uri.bucket, fs::path(s3_uri.key) / path_in_backup,
s3_settings.request_settings, blob_storage_log, {},
threadPoolCallbackRunner<void>(getBackupsIOThreadPool().get(), "BackupWriterS3"));
}
@ -302,6 +315,7 @@ std::unique_ptr<WriteBuffer> BackupWriterS3::writeFile(const String & file_name)
fs::path(s3_uri.key) / file_name,
DBMS_DEFAULT_BUFFER_SIZE,
s3_settings.request_settings,
blob_storage_log,
std::nullopt,
threadPoolCallbackRunner<void>(getBackupsIOThreadPool().get(), "BackupWriterS3"),
write_settings);
@ -311,8 +325,19 @@ void BackupWriterS3::removeFile(const String & file_name)
{
S3::DeleteObjectRequest request;
request.SetBucket(s3_uri.bucket);
request.SetKey(fs::path(s3_uri.key) / file_name);
auto key = fs::path(s3_uri.key) / file_name;
request.SetKey(key);
auto outcome = client->DeleteObject(request);
if (blob_storage_log)
{
blob_storage_log->addEvent(
BlobStorageLogElement::EventType::Delete,
s3_uri.bucket, key, /* local_path */ "", /* data_size */ 0,
outcome.IsSuccess() ? nullptr : &outcome.GetError());
}
if (!outcome.IsSuccess() && !isNotFoundError(outcome.GetError().GetErrorType()))
throw S3Exception(outcome.GetError().GetMessage(), outcome.GetError().GetErrorType());
}
@ -371,6 +396,16 @@ void BackupWriterS3::removeFilesBatch(const Strings & file_names)
request.SetDelete(delkeys);
auto outcome = client->DeleteObjects(request);
if (blob_storage_log)
{
const auto * outcome_error = outcome.IsSuccess() ? nullptr : &outcome.GetError();
auto time_now = std::chrono::system_clock::now();
for (const auto & obj : current_chunk)
blob_storage_log->addEvent(BlobStorageLogElement::EventType::Delete, s3_uri.bucket, obj.GetKey(),
/* local_path */ "", /* data_size */ 0, outcome_error, time_now);
}
if (!outcome.IsSuccess() && !isNotFoundError(outcome.GetError().GetErrorType()))
throw S3Exception(outcome.GetError().GetMessage(), outcome.GetError().GetErrorType());
}

View File

@ -8,7 +8,7 @@
#include <IO/S3Common.h>
#include <Storages/StorageS3Settings.h>
#include <Interpreters/Context_fwd.h>
#include <IO/S3/BlobStorageLogWriter.h>
namespace DB
{
@ -32,6 +32,8 @@ private:
const DataSourceDescription data_source_description;
S3Settings s3_settings;
std::shared_ptr<S3::Client> client;
BlobStorageLogWriterPtr blob_storage_log;
};
@ -63,6 +65,8 @@ private:
S3Settings s3_settings;
std::shared_ptr<S3::Client> client;
std::optional<bool> supports_batch_delete;
BlobStorageLogWriterPtr blob_storage_log;
};
}

View File

@ -321,8 +321,6 @@ protected:
bool allow_merge_tree_settings = false;
bool cancelled = false;
bool logging_initialized = false;
};
}

View File

@ -17,6 +17,7 @@
#include <Interpreters/TransactionsInfoLog.h>
#include <Interpreters/AsynchronousInsertLog.h>
#include <Interpreters/BackupLog.h>
#include <IO/S3/BlobStorageLogWriter.h>
#include <Common/MemoryTrackerBlockerInThread.h>
#include <Common/SystemLogBase.h>

View File

@ -31,7 +31,8 @@
M(FilesystemCacheLogElement) \
M(FilesystemReadPrefetchesLogElement) \
M(AsynchronousInsertLogElement) \
M(BackupLogElement)
M(BackupLogElement) \
M(BlobStorageLogElement)
namespace Poco
{

View File

@ -617,8 +617,13 @@ Changelog::Changelog(
/// Load all files on changelog disks
std::unordered_set<DiskPtr> read_disks;
const auto load_from_disk = [&](const auto & disk)
{
if (read_disks.contains(disk))
return;
LOG_TRACE(log, "Reading from disk {}", disk->getName());
std::unordered_map<std::string, std::string> incomplete_files;
@ -639,19 +644,25 @@ Changelog::Changelog(
std::vector<std::string> changelog_files;
for (auto it = disk->iterateDirectory(""); it->isValid(); it->next())
{
if (it->name() == changelogs_detached_dir)
const auto & file_name = it->name();
if (file_name == changelogs_detached_dir)
continue;
if (it->name().starts_with(tmp_prefix))
if (file_name.starts_with(tmp_prefix))
{
incomplete_files.emplace(it->name().substr(tmp_prefix.size()), it->path());
incomplete_files.emplace(file_name.substr(tmp_prefix.size()), it->path());
continue;
}
if (clean_incomplete_file(it->path()))
continue;
changelog_files.push_back(it->path());
if (file_name.starts_with(DEFAULT_PREFIX))
{
if (!clean_incomplete_file(it->path()))
changelog_files.push_back(it->path());
}
else
{
LOG_WARNING(log, "Unknown file found in log directory: {}", file_name);
}
}
for (const auto & changelog_file : changelog_files)
@ -671,6 +682,8 @@ Changelog::Changelog(
for (const auto & [name, path] : incomplete_files)
disk->removeFile(path);
read_disks.insert(disk);
};
/// Load all files from old disks

View File

@ -539,8 +539,12 @@ KeeperSnapshotManager::KeeperSnapshotManager(
, storage_tick_time(storage_tick_time_)
, keeper_context(keeper_context_)
{
std::unordered_set<DiskPtr> read_disks;
const auto load_snapshot_from_disk = [&](const auto & disk)
{
if (read_disks.contains(disk))
return;
LOG_TRACE(log, "Reading from disk {}", disk->getName());
std::unordered_map<std::string, std::string> incomplete_files;
@ -590,6 +594,8 @@ KeeperSnapshotManager::KeeperSnapshotManager(
for (const auto & [name, path] : incomplete_files)
disk->removeFile(path);
read_disks.insert(disk);
};
for (const auto & disk : keeper_context->getOldSnapshotDisks())

View File

@ -147,12 +147,14 @@ void KeeperSnapshotManagerS3::uploadSnapshotImpl(const SnapshotFileInfo & snapsh
const auto create_writer = [&](const auto & key)
{
/// blob_storage_log is not used for keeper
return WriteBufferFromS3(
s3_client->client,
s3_client->uri.bucket,
key,
DBMS_DEFAULT_BUFFER_SIZE,
request_settings_1
request_settings_1,
/* blob_log */ {}
);
};
@ -214,6 +216,7 @@ void KeeperSnapshotManagerS3::uploadSnapshotImpl(const SnapshotFileInfo & snapsh
delete_request.SetBucket(s3_client->uri.bucket);
delete_request.SetKey(lock_file);
auto delete_outcome = s3_client->client->DeleteObject(delete_request);
if (!delete_outcome.IsSuccess())
throw S3Exception(delete_outcome.GetError().GetMessage(), delete_outcome.GetError().GetErrorType());
}

View File

@ -269,6 +269,7 @@ KeeperStateManager::KeeperStateManager(
void KeeperStateManager::loadLogStore(uint64_t last_commited_index, uint64_t logs_to_keep)
{
log_store->init(last_commited_index, logs_to_keep);
log_store_initialized = true;
}
void KeeperStateManager::system_exit(const int /* exit_code */)
@ -361,6 +362,8 @@ void KeeperStateManager::save_state(const nuraft::srv_state & state)
nuraft::ptr<nuraft::srv_state> KeeperStateManager::read_state()
{
chassert(log_store_initialized);
const auto & old_path = getOldServerStatePath();
auto disk = getStateFileDisk();
@ -454,7 +457,12 @@ nuraft::ptr<nuraft::srv_state> KeeperStateManager::read_state()
disk->removeFile(copy_lock_file);
}
LOG_WARNING(logger, "No state was read");
if (log_store->next_slot() != 1)
LOG_ERROR(
logger,
"No state was read but Keeper contains data which indicates that the state file was lost. This is dangerous and can lead to "
"data loss.");
return nullptr;
}

View File

@ -121,6 +121,7 @@ private:
mutable std::mutex configuration_wrapper_mutex;
KeeperConfigurationWrapper configuration_wrapper TSA_GUARDED_BY(configuration_wrapper_mutex);
bool log_store_initialized = false;
nuraft::ptr<KeeperLogStore> log_store;
const String server_state_file_name;

View File

@ -235,6 +235,11 @@ std::shared_ptr<FilesystemReadPrefetchesLog> Context::getFilesystemReadPrefetche
return nullptr;
}
std::shared_ptr<BlobStorageLog> Context::getBlobStorageLog() const
{
return nullptr;
}
void Context::setConfig(const ConfigurationPtr & config)
{
auto lock = getGlobalLock();

View File

@ -27,6 +27,7 @@ struct ContextSharedPart;
class Macros;
class FilesystemCacheLog;
class FilesystemReadPrefetchesLog;
class BlobStorageLog;
/// A small class which owns ContextShared.
/// We don't use something like unique_ptr directly to allow ContextShared type to be incomplete.
@ -115,6 +116,7 @@ public:
std::shared_ptr<FilesystemCacheLog> getFilesystemCacheLog() const;
std::shared_ptr<FilesystemReadPrefetchesLog> getFilesystemReadPrefetchesLog() const;
std::shared_ptr<BlobStorageLog> getBlobStorageLog() const;
enum class ApplicationType
{

View File

@ -729,6 +729,7 @@ class IColumn;
M(Bool, skip_download_if_exceeds_query_cache, true, "Skip download from remote filesystem if exceeds query cache size", 0) \
M(UInt64, filesystem_cache_max_download_size, (128UL * 1024 * 1024 * 1024), "Max remote filesystem cache size that can be downloaded by a single query", 0) \
M(Bool, throw_on_error_from_cache_on_write_operations, false, "Ignore error from cache when caching on write operations (INSERT, merges)", 0) \
M(UInt64, filesystem_cache_segments_batch_size, 20, "Limit on size of a single batch of file segments that a read buffer can request from cache. Too low value will lead to excessive requests to cache, too large may slow down eviction from cache", 0) \
\
M(Bool, load_marks_asynchronously, false, "Load MergeTree marks asynchronously", 0) \
M(Bool, enable_filesystem_read_prefetches_log, false, "Log to system.filesystem prefetch_log during query. Should be used only for testing or debugging, not recommended to be turned on by default", 0) \

View File

@ -40,13 +40,15 @@ DatabaseFilesystem::DatabaseFilesystem(const String & name_, const String & path
{
path = user_files_path / path;
}
else if (!is_local && !pathStartsWith(fs::path(path), user_files_path))
path = fs::absolute(path).lexically_normal();
if (!is_local && !pathStartsWith(fs::path(path), user_files_path))
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Path must be inside user-files path: {}", user_files_path.string());
}
path = fs::absolute(path).lexically_normal();
if (!fs::exists(path))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Path does not exist: {}", path);
}

View File

@ -26,10 +26,11 @@ void processSQLiteError(const String & message, bool throw_on_error)
String validateSQLiteDatabasePath(const String & path, const String & user_files_path, bool need_check, bool throw_on_error)
{
if (fs::path(path).is_relative())
return fs::absolute(fs::path(user_files_path) / path).lexically_normal();
String absolute_path = fs::absolute(path).lexically_normal();
if (fs::path(path).is_relative())
absolute_path = fs::absolute(fs::path(user_files_path) / path).lexically_normal();
String absolute_user_files_path = fs::absolute(user_files_path).lexically_normal();
if (need_check && !absolute_path.starts_with(absolute_user_files_path))

View File

@ -60,9 +60,9 @@ public:
void createDirectories(const String & path) override
{
auto tx = createEncryptedTransaction();
tx->createDirectories(path);
tx->commit();
auto wrapped_path = wrappedPath(path);
/// Delegate disk can have retry logic for recursive directory creation. Let it handle it.
delegate->createDirectories(wrapped_path);
}
void clearDirectory(const String & path) override

View File

@ -114,30 +114,41 @@ void CachedOnDiskReadBufferFromFile::appendFilesystemCacheLog(
cache_log->add(std::move(elem));
}
void CachedOnDiskReadBufferFromFile::initialize(size_t offset, size_t size)
bool CachedOnDiskReadBufferFromFile::nextFileSegmentsBatch()
{
chassert(!file_segments || file_segments->empty());
size_t size = getRemainingSizeToRead();
if (!size)
return false;
if (settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache)
{
file_segments = cache->get(cache_key, file_offset_of_buffer_end, size, settings.filesystem_cache_segments_batch_size);
}
else
{
CreateFileSegmentSettings create_settings(FileSegmentKind::Regular);
file_segments = cache->getOrSet(cache_key, file_offset_of_buffer_end, size, file_size.value(), create_settings, settings.filesystem_cache_segments_batch_size);
}
return !file_segments->empty();
}
void CachedOnDiskReadBufferFromFile::initialize()
{
if (initialized)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Caching buffer already initialized");
implementation_buffer.reset();
if (settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache)
{
file_segments = cache->get(cache_key, offset, size);
}
else
{
CreateFileSegmentSettings create_settings(FileSegmentKind::Regular);
file_segments = cache->getOrSet(cache_key, offset, size, file_size.value(), create_settings);
}
/**
* Segments in returned list are ordered in ascending order and represent a full contiguous
* interval (no holes). Each segment in returned list has state: DOWNLOADED, DOWNLOADING or EMPTY.
*/
if (file_segments->empty())
if (!nextFileSegmentsBatch())
throw Exception(ErrorCodes::LOGICAL_ERROR, "List of file segments cannot be empty");
chassert(!file_segments->empty());
LOG_TEST(
log,
"Having {} file segments to read: {}, current offset: {}",
@ -512,7 +523,7 @@ bool CachedOnDiskReadBufferFromFile::completeFileSegmentAndGetNext()
cache_file_reader.reset();
file_segments->popFront();
if (file_segments->empty())
if (file_segments->empty() && !nextFileSegmentsBatch())
return false;
current_file_segment = &file_segments->front();
@ -788,9 +799,9 @@ bool CachedOnDiskReadBufferFromFile::nextImplStep()
return false;
if (!initialized)
initialize(file_offset_of_buffer_end, getTotalSizeToRead());
initialize();
if (file_segments->empty())
if (file_segments->empty() && !nextFileSegmentsBatch())
return false;
const size_t original_buffer_size = internal_buffer.size();
@ -1159,7 +1170,7 @@ off_t CachedOnDiskReadBufferFromFile::seek(off_t offset, int whence)
return new_pos;
}
size_t CachedOnDiskReadBufferFromFile::getTotalSizeToRead()
size_t CachedOnDiskReadBufferFromFile::getRemainingSizeToRead()
{
/// Last position should be guaranteed to be set, as at least we always know file size.
if (!read_until_position)

View File

@ -63,7 +63,7 @@ public:
private:
using ImplementationBufferPtr = std::shared_ptr<ReadBufferFromFileBase>;
void initialize(size_t offset, size_t size);
void initialize();
/**
* Return a list of file segments ordered in ascending order. This list represents
@ -85,7 +85,7 @@ private:
bool nextImplStep();
size_t getTotalSizeToRead();
size_t getRemainingSizeToRead();
bool completeFileSegmentAndGetNext();
@ -95,6 +95,8 @@ private:
static bool canStartFromCache(size_t current_offset, const FileSegment & file_segment);
bool nextFileSegmentsBatch();
Poco::Logger * log;
FileCache::Key cache_key;
String source_file_path;

View File

@ -209,7 +209,7 @@ void ReadBufferFromRemoteFSGather::setReadUntilPosition(size_t position)
void ReadBufferFromRemoteFSGather::reset()
{
current_object = {};
current_object = StoredObject();
current_buf_idx = {};
current_buf.reset();
}

View File

@ -519,7 +519,7 @@ std::unique_ptr<ReadBufferFromFileBase> DiskObjectStorage::readFile(
return object_storage->readObjects(
storage_objects,
object_storage->getAdjustedSettingsFromMetadataFile(updateResourceLink(settings, getReadResourceName()), path),
updateResourceLink(settings, getReadResourceName()),
read_hint,
file_size);
}
@ -532,12 +532,9 @@ std::unique_ptr<WriteBufferFromFileBase> DiskObjectStorage::writeFile(
{
LOG_TEST(log, "Write file: {}", path);
WriteSettings write_settings = updateResourceLink(settings, getWriteResourceName());
auto transaction = createObjectStorageTransaction();
return transaction->writeFile(
path,
buf_size,
mode,
object_storage->getAdjustedSettingsFromMetadataFile(updateResourceLink(settings, getWriteResourceName()), path));
return transaction->writeFile(path, buf_size, mode, write_settings);
}
Strings DiskObjectStorage::getBlobPath(const String & path) const

View File

@ -684,7 +684,7 @@ std::unique_ptr<WriteBufferFromFileBase> DiskObjectStorageTransaction::writeFile
}
/// seems ok
auto object = StoredObject(object_key.serialize());
auto object = StoredObject(object_key.serialize(), path);
std::function<void(size_t count)> create_metadata_callback;
if (autocommit)
@ -782,7 +782,7 @@ void DiskObjectStorageTransaction::writeFileUsingBlobWritingFunction(
}
/// seems ok
auto object = StoredObject(object_key.serialize());
auto object = StoredObject(object_key.serialize(), path);
auto write_operation = std::make_unique<WriteFileObjectStorageOperation>(object_storage, metadata_storage, object);
operations_to_execute.emplace_back(std::move(write_operation));

View File

@ -206,10 +206,6 @@ public:
virtual bool supportParallelWrite() const { return false; }
virtual ReadSettings getAdjustedSettingsFromMetadataFile(const ReadSettings & settings, const std::string & /* path */) const { return settings; }
virtual WriteSettings getAdjustedSettingsFromMetadataFile(const WriteSettings & settings, const std::string & /* path */) const { return settings; }
virtual ReadSettings patchSettings(const ReadSettings & read_settings) const;
virtual WriteSettings patchSettings(const WriteSettings & write_settings) const;

View File

@ -141,7 +141,7 @@ StoredObjects MetadataStorageFromDisk::getStorageObjects(const std::string & pat
objects.reserve(keys_with_meta.size());
for (const auto & [object_key, object_meta] : keys_with_meta)
{
objects.emplace_back(object_key.serialize(), object_meta.size_bytes, path);
objects.emplace_back(object_key.serialize(), path, object_meta.size_bytes);
}
return objects;

View File

@ -106,7 +106,7 @@ StoredObjects MetadataStorageFromPlainObjectStorage::getStorageObjects(const std
{
size_t object_size = getFileSize(path);
auto object_key = object_storage->generateObjectKeyForPath(path);
return {StoredObject(object_key.serialize(), object_size, path)};
return {StoredObject(object_key.serialize(), path, object_size)};
}
const IMetadataStorage & MetadataStorageFromPlainObjectStorageTransaction::getStorageForNonTransactionalReads() const

View File

@ -15,6 +15,8 @@
#include <IO/S3/copyS3File.h>
#include <Interpreters/Context.h>
#include <Interpreters/threadPoolCallbackRunner.h>
#include <IO/S3/BlobStorageLogWriter.h>
#include <Disks/ObjectStorages/S3/diskSettings.h>
#include <Common/getRandomASCIIString.h>
@ -249,12 +251,18 @@ std::unique_ptr<WriteBufferFromFileBase> S3ObjectStorage::writeObject( /// NOLIN
if (write_settings.s3_allow_parallel_part_upload)
scheduler = threadPoolCallbackRunner<void>(getThreadPoolWriter(), "VFSWrite");
auto blob_storage_log = BlobStorageLogWriter::create(disk_name);
if (blob_storage_log)
blob_storage_log->local_path = object.local_path;
return std::make_unique<WriteBufferFromS3>(
client.get(),
bucket,
object.remote_path,
buf_size,
settings_ptr->request_settings,
std::move(blob_storage_log),
attributes,
std::move(scheduler),
disk_write_settings);
@ -321,6 +329,10 @@ void S3ObjectStorage::removeObjectImpl(const StoredObject & object, bool if_exis
request.SetBucket(bucket);
request.SetKey(object.remote_path);
auto outcome = client.get()->DeleteObject(request);
if (auto blob_storage_log = BlobStorageLogWriter::create(disk_name))
blob_storage_log->addEvent(BlobStorageLogElement::EventType::Delete,
bucket, object.remote_path, object.local_path, object.bytes_size,
outcome.IsSuccess() ? nullptr : &outcome.GetError());
throwIfUnexpectedError(outcome, if_exists);
@ -344,6 +356,7 @@ void S3ObjectStorage::removeObjectsImpl(const StoredObjects & objects, bool if_e
size_t chunk_size_limit = settings_ptr->objects_chunk_size_to_delete;
size_t current_position = 0;
auto blob_storage_log = BlobStorageLogWriter::create(disk_name);
while (current_position < objects.size())
{
std::vector<Aws::S3::Model::ObjectIdentifier> current_chunk;
@ -369,9 +382,18 @@ void S3ObjectStorage::removeObjectsImpl(const StoredObjects & objects, bool if_e
request.SetDelete(delkeys);
auto outcome = client.get()->DeleteObjects(request);
throwIfUnexpectedError(outcome, if_exists);
if (blob_storage_log)
{
const auto * outcome_error = outcome.IsSuccess() ? nullptr : &outcome.GetError();
auto time_now = std::chrono::system_clock::now();
for (const auto & object : objects)
blob_storage_log->addEvent(BlobStorageLogElement::EventType::Delete,
bucket, object.remote_path, object.local_path, object.bytes_size,
outcome_error, time_now);
}
LOG_DEBUG(log, "Objects with paths [{}] were removed from S3", keys);
throwIfUnexpectedError(outcome, if_exists);
}
}
}
@ -450,6 +472,7 @@ void S3ObjectStorage::copyObjectToAnotherObjectStorage( // NOLINT
object_to.remote_path,
settings_ptr->request_settings,
patchSettings(read_settings),
BlobStorageLogWriter::create(disk_name),
object_to_attributes,
scheduler,
/* for_disk_s3= */ true);
@ -478,6 +501,7 @@ void S3ObjectStorage::copyObject( // NOLINT
object_to.remote_path,
settings_ptr->request_settings,
patchSettings(read_settings),
BlobStorageLogWriter::create(disk_name),
object_to_attributes,
scheduler,
/* for_disk_s3= */ true);
@ -520,7 +544,7 @@ std::unique_ptr<IObjectStorage> S3ObjectStorage::cloneObjectStorage(
return std::make_unique<S3ObjectStorage>(
std::move(new_client), std::move(new_s3_settings),
version_id, s3_capabilities, new_namespace,
endpoint, object_key_prefix);
endpoint, object_key_prefix, disk_name);
}
ObjectStorageKey S3ObjectStorage::generateObjectKeyForPath(const std::string &) const

View File

@ -50,9 +50,11 @@ private:
const S3Capabilities & s3_capabilities_,
String bucket_,
String connection_string,
String object_key_prefix_)
String object_key_prefix_,
const String & disk_name_)
: bucket(std::move(bucket_))
, object_key_prefix(std::move(object_key_prefix_))
, disk_name(disk_name_)
, client(std::move(client_))
, s3_settings(std::move(s3_settings_))
, s3_capabilities(s3_capabilities_)
@ -173,7 +175,7 @@ private:
private:
std::string bucket;
String object_key_prefix;
std::string disk_name;
MultiVersion<S3::Client> client;
MultiVersion<S3ObjectStorageSettings> s3_settings;

View File

@ -116,6 +116,7 @@ void registerDiskS3(DiskFactory & factory, bool global_skip_access_check)
MetadataStoragePtr metadata_storage;
auto settings = getSettings(config, config_prefix, context);
auto client = getClient(config, config_prefix, context, *settings);
if (type == "s3_plain")
{
/// send_metadata changes the filenames (includes revision), while
@ -127,14 +128,18 @@ void registerDiskS3(DiskFactory & factory, bool global_skip_access_check)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "s3_plain does not supports send_metadata");
s3_storage = std::make_shared<S3PlainObjectStorage>(
std::move(client), std::move(settings), uri.version_id, s3_capabilities, uri.bucket, uri.endpoint, uri.key);
std::move(client), std::move(settings),
uri.version_id, s3_capabilities,
uri.bucket, uri.endpoint, uri.key, name);
metadata_storage = std::make_shared<MetadataStorageFromPlainObjectStorage>(s3_storage, uri.key);
}
else
{
s3_storage = std::make_shared<S3ObjectStorage>(
std::move(client), std::move(settings), uri.version_id, s3_capabilities, uri.bucket, uri.endpoint, uri.key);
std::move(client), std::move(settings),
uri.version_id, s3_capabilities,
uri.bucket, uri.endpoint, uri.key, name);
auto [metadata_path, metadata_disk] = prepareForLocalMetadata(name, config, config_prefix, context);
metadata_storage = std::make_shared<MetadataStorageFromDisk>(metadata_disk, uri.key);
}

View File

@ -19,23 +19,10 @@ struct StoredObject
uint64_t bytes_size = 0;
StoredObject() = default;
explicit StoredObject(String remote_path_)
: remote_path(std::move(remote_path_))
{}
StoredObject(
String remote_path_,
uint64_t bytes_size_)
: remote_path(std::move(remote_path_))
, bytes_size(bytes_size_)
{}
StoredObject(
String remote_path_,
uint64_t bytes_size_,
String local_path_)
explicit StoredObject(
const String & remote_path_ = "",
const String & local_path_ = "",
uint64_t bytes_size_ = 0)
: remote_path(std::move(remote_path_))
, local_path(std::move(local_path_))
, bytes_size(bytes_size_)

View File

@ -87,7 +87,7 @@ StoredObjects MetadataStorageFromStaticFilesWebServer::getStorageObjects(const s
remote_path = remote_path.substr(object_storage.url.size());
std::shared_lock shared_lock(object_storage.metadata_mutex);
return {StoredObject(remote_path, object_storage.files.at(path).size, path)};
return {StoredObject(remote_path, path, object_storage.files.at(path).size)};
}
std::vector<std::string> MetadataStorageFromStaticFilesWebServer::listDirectory(const std::string & path) const

View File

@ -96,6 +96,10 @@ NamesAndTypesList collectNested(const NamesAndTypesList & names_and_types, bool
nested[field_name].emplace_back(nested_name, type);
}
/// Collect nested recursively.
for (auto & [field_name, elements] : nested)
elements = collectNested(elements, allow_split_by_underscore, format_name);
for (const auto & [field_name, elements]: nested)
result.emplace_back(field_name, std::make_shared<DataTypeTuple>(elements.getTypes(), elements.getNames()));

View File

@ -207,6 +207,8 @@ public:
FunctionBasePtr buildImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & return_type) const override
{
if (arguments.size() == 1)
return FunctionFactory::instance().getImpl("toString", context)->build(arguments);
if (std::ranges::all_of(arguments, [](const auto & elem) { return isArray(elem.type); }))
return FunctionFactory::instance().getImpl("arrayConcat", context)->build(arguments);
if (std::ranges::all_of(arguments, [](const auto & elem) { return isMap(elem.type); }))
@ -221,10 +223,10 @@ public:
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
{
if (arguments.size() < 2)
if (arguments.empty())
throw Exception(
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Number of arguments for function {} doesn't match: passed {}, should be at least 2.",
"Number of arguments for function {} doesn't match: passed {}, should be at least 1.",
getName(),
arguments.size());

View File

@ -84,6 +84,10 @@ public:
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t input_rows_count) const override
{
auto col_in_untyped = arguments[0].column;
if (input_rows_count == 0)
return col_in_untyped;
const double inverse_probability = assert_cast<const ColumnConst &>(*arguments[1].column).getValue<double>();
if (inverse_probability < 0.0 || 1.0 < inverse_probability)

View File

@ -191,6 +191,12 @@ private:
memory.resize(2 * prev_size + 1);
Base::set(memory.data() + prev_size, memory.size() - prev_size, 0);
}
void finalizeImpl() final
{
/// there is no need to allocate twice more memory at finalize()
/// So make that call no op, do not call here nextImpl()
}
};
}

View File

@ -2,6 +2,60 @@
#include <Common/Exception.h>
namespace
{
using namespace DB;
/// SinkToOut provides the safe way to do direct write into buffer's memory
/// When out->capacity() is not less than guaranteed_capacity, SinkToOut is pointing directly to out_'s memory.
/// Otherwise the writes are directed to the temporary memory. That data is copied to out_ at finalize call.
class SinkToOut
{
public:
SinkToOut(WriteBuffer * out_, Memory<> & mem_, size_t guaranteed_capacity)
: sink(out_)
, tmp_out(mem_)
, cur_out(sink)
{
chassert(sink);
if (sink->available() < guaranteed_capacity)
{
mem_.resize(guaranteed_capacity);
cur_out = &tmp_out;
}
}
size_t getCapacity()
{
return cur_out->available();
}
BufferBase::Position getPosition()
{
return cur_out->position();
}
void advancePosition(size_t size)
{
chassert(size <= cur_out->available());
cur_out->position() += size;
}
void finalize()
{
tmp_out.finalize();
sink->write(tmp_out.buffer().begin(), tmp_out.count());
}
private:
WriteBuffer * sink;
BufferWithOutsideMemory<WriteBuffer> tmp_out;
WriteBuffer * cur_out;
};
}
namespace DB
{
namespace ErrorCodes
@ -12,10 +66,8 @@ namespace ErrorCodes
Lz4DeflatingWriteBuffer::Lz4DeflatingWriteBuffer(
std::unique_ptr<WriteBuffer> out_, int compression_level, size_t buf_size, char * existing_memory, size_t alignment)
: WriteBufferWithOwnMemoryDecorator(std::move(out_), buf_size, existing_memory, alignment)
, in_data(nullptr)
, out_data(nullptr)
, in_capacity(0)
, out_capacity(0)
, tmp_memory(buf_size)
{
kPrefs = {
{LZ4F_max256KB,
@ -36,8 +88,8 @@ Lz4DeflatingWriteBuffer::Lz4DeflatingWriteBuffer(
if (LZ4F_isError(ret))
throw Exception(
ErrorCodes::LZ4_ENCODER_FAILED,
"creation of LZ4 compression context failed. LZ4F version: {}",
LZ4F_VERSION);
"creation of LZ4 compression context failed. LZ4F version: {}, error: {}",
LZ4F_VERSION, LZ4F_getErrorName(ret));
}
Lz4DeflatingWriteBuffer::~Lz4DeflatingWriteBuffer()
@ -51,110 +103,81 @@ void Lz4DeflatingWriteBuffer::nextImpl()
if (!offset())
return;
in_data = reinterpret_cast<void *>(working_buffer.begin());
in_capacity = offset();
out_capacity = out->buffer().end() - out->position();
out_data = reinterpret_cast<void *>(out->position());
try
if (first_time)
{
if (first_time)
auto sink = SinkToOut(out.get(), tmp_memory, LZ4F_HEADER_SIZE_MAX);
chassert(sink.getCapacity() >= LZ4F_HEADER_SIZE_MAX);
/// write frame header and check for errors
size_t header_size = LZ4F_compressBegin(
ctx, sink.getPosition(), sink.getCapacity(), &kPrefs);
if (LZ4F_isError(header_size))
throw Exception(
ErrorCodes::LZ4_ENCODER_FAILED,
"LZ4 failed to start stream encoding. LZ4F version: {}, error: {}",
LZ4F_VERSION, LZ4F_getErrorName(header_size));
sink.advancePosition(header_size);
sink.finalize();
first_time = false;
}
auto * in_data = working_buffer.begin();
auto in_capacity = offset();
while (in_capacity > 0)
{
/// Ensure that there is enough space for compressed block of minimal size
size_t min_compressed_block_size = LZ4F_compressBound(1, &kPrefs);
auto sink = SinkToOut(out.get(), tmp_memory, min_compressed_block_size);
chassert(sink.getCapacity() >= min_compressed_block_size);
/// LZ4F_compressUpdate compresses whole input buffer at once so we need to shink it manually
size_t cur_buffer_size = in_capacity;
if (sink.getCapacity() >= min_compressed_block_size) /// We cannot shrink the input buffer if it's already too small.
{
if (out_capacity < LZ4F_HEADER_SIZE_MAX)
{
out->next();
out_capacity = out->buffer().end() - out->position();
out_data = reinterpret_cast<void *>(out->position());
}
/// write frame header and check for errors
size_t header_size = LZ4F_compressBegin(ctx, out_data, out_capacity, &kPrefs);
if (LZ4F_isError(header_size))
throw Exception(
ErrorCodes::LZ4_ENCODER_FAILED,
"LZ4 failed to start stream encoding. LZ4F version: {}",
LZ4F_VERSION);
out_capacity -= header_size;
out->position() = out->buffer().end() - out_capacity;
out_data = reinterpret_cast<void *>(out->position());
first_time = false;
while (sink.getCapacity() < LZ4F_compressBound(cur_buffer_size, &kPrefs))
cur_buffer_size /= 2;
}
do
{
/// Ensure that there is enough space for compressed block of minimal size
size_t min_compressed_block_size = LZ4F_compressBound(1, &kPrefs);
if (out_capacity < min_compressed_block_size)
{
out->next();
out_capacity = out->buffer().end() - out->position();
out_data = reinterpret_cast<void *>(out->position());
}
size_t compressed_size = LZ4F_compressUpdate(
ctx, sink.getPosition(), sink.getCapacity(), in_data, cur_buffer_size, nullptr);
/// LZ4F_compressUpdate compresses whole input buffer at once so we need to shink it manually
size_t cur_buffer_size = in_capacity;
if (out_capacity >= min_compressed_block_size) /// We cannot shrink the input buffer if it's already too small.
{
while (out_capacity < LZ4F_compressBound(cur_buffer_size, &kPrefs))
cur_buffer_size /= 2;
}
if (LZ4F_isError(compressed_size))
throw Exception(
ErrorCodes::LZ4_ENCODER_FAILED,
"LZ4 failed to encode stream. LZ4F version: {}, error {}, out_capacity {}",
LZ4F_VERSION, LZ4F_getErrorName(compressed_size), sink.getCapacity());
size_t compressed_size = LZ4F_compressUpdate(ctx, out_data, out_capacity, in_data, cur_buffer_size, nullptr);
in_capacity -= cur_buffer_size;
in_data += cur_buffer_size;
if (LZ4F_isError(compressed_size))
throw Exception(
ErrorCodes::LZ4_ENCODER_FAILED,
"LZ4 failed to encode stream. LZ4F version: {}",
LZ4F_VERSION);
in_capacity -= cur_buffer_size;
in_data = reinterpret_cast<void *>(working_buffer.end() - in_capacity);
out_capacity -= compressed_size;
out->position() = out->buffer().end() - out_capacity;
out_data = reinterpret_cast<void *>(out->position());
}
while (in_capacity > 0);
sink.advancePosition(compressed_size);
sink.finalize();
}
catch (...)
{
out->position() = out->buffer().begin();
throw;
}
out->next();
out_capacity = out->buffer().end() - out->position();
}
void Lz4DeflatingWriteBuffer::finalizeBefore()
{
next();
out_capacity = out->buffer().end() - out->position();
out_data = reinterpret_cast<void *>(out->position());
if (out_capacity < LZ4F_compressBound(0, &kPrefs))
{
out->next();
out_capacity = out->buffer().end() - out->position();
out_data = reinterpret_cast<void *>(out->position());
}
auto suffix_size = LZ4F_compressBound(0, &kPrefs);
auto sink = SinkToOut(out.get(), tmp_memory, suffix_size);
chassert(sink.getCapacity() >= suffix_size);
/// compression end
size_t end_size = LZ4F_compressEnd(ctx, out_data, out_capacity, nullptr);
size_t end_size = LZ4F_compressEnd(ctx, sink.getPosition(), sink.getCapacity(), nullptr);
if (LZ4F_isError(end_size))
throw Exception(
ErrorCodes::LZ4_ENCODER_FAILED,
"LZ4 failed to end stream encoding. LZ4F version: {}",
LZ4F_VERSION);
"LZ4 failed to end stream encoding. LZ4F version: {}, error {}, out_capacity {}",
LZ4F_VERSION, LZ4F_getErrorName(end_size), sink.getCapacity());
out_capacity -= end_size;
out->position() = out->buffer().end() - out_capacity;
out_data = reinterpret_cast<void *>(out->position());
sink.advancePosition(end_size);
sink.finalize();
}
void Lz4DeflatingWriteBuffer::finalizeAfter()

View File

@ -32,11 +32,7 @@ private:
LZ4F_preferences_t kPrefs; /// NOLINT
LZ4F_compressionContext_t ctx;
void * in_data;
void * out_data;
size_t in_capacity;
size_t out_capacity;
Memory<> tmp_memory;
bool first_time = true;
};

View File

@ -100,6 +100,7 @@ struct ReadSettings
bool enable_filesystem_cache_log = false;
/// Don't populate cache when the read is not part of query execution (e.g. background thread).
bool avoid_readthrough_cache_outside_query_context = true;
size_t filesystem_cache_segments_batch_size = 20;
size_t filesystem_cache_max_download_size = (128UL * 1024 * 1024 * 1024);
bool skip_download_if_exceeds_query_cache = true;

View File

@ -0,0 +1,72 @@
#include <IO/S3/BlobStorageLogWriter.h>
#if USE_AWS_S3
#include <base/getThreadId.h>
#include <Common/setThreadName.h>
#include <IO/S3/Client.h>
#include <Interpreters/Context.h>
namespace DB
{
void BlobStorageLogWriter::addEvent(
BlobStorageLogElement::EventType event_type,
const String & bucket,
const String & remote_path,
const String & local_path_,
size_t data_size,
const Aws::S3::S3Error * error,
BlobStorageLogElement::EvenTime time_now)
{
if (!log)
return;
if (!time_now.time_since_epoch().count())
time_now = std::chrono::system_clock::now();
BlobStorageLogElement element;
element.event_type = event_type;
element.query_id = query_id;
element.thread_id = getThreadId();
element.thread_name = getThreadName();
element.disk_name = disk_name;
element.bucket = bucket;
element.remote_path = remote_path;
element.local_path = local_path_.empty() ? local_path : local_path_;
element.data_size = data_size;
if (error)
{
element.error_code = static_cast<Int32>(error->GetErrorType());
element.error_message = error->GetMessage();
}
element.event_time = time_now;
log->add(element);
}
BlobStorageLogWriterPtr BlobStorageLogWriter::create(const String & disk_name)
{
#ifndef CLICKHOUSE_KEEPER_STANDALONE_BUILD /// Keeper standalone build doesn't have a context
if (auto blob_storage_log = Context::getGlobalContextInstance()->getBlobStorageLog())
{
auto log_writer = std::make_shared<BlobStorageLogWriter>(std::move(blob_storage_log));
log_writer->disk_name = disk_name;
if (CurrentThread::isInitialized() && CurrentThread::get().getQueryContext())
log_writer->query_id = CurrentThread::getQueryId();
return log_writer;
}
#endif
return {};
}
}
#endif

View File

@ -0,0 +1,57 @@
#pragma once
#include <Interpreters/BlobStorageLog.h>
#include "config.h"
#if USE_AWS_S3
namespace Aws::S3
{
class S3Error;
}
namespace DB
{
using BlobStorageLogPtr = std::shared_ptr<BlobStorageLog>;
class BlobStorageLogWriter;
using BlobStorageLogWriterPtr = std::shared_ptr<BlobStorageLogWriter>;
/// Helper class tp write events to BlobStorageLog
/// Can additionally hold some context information
class BlobStorageLogWriter : private boost::noncopyable
{
public:
BlobStorageLogWriter() = default;
explicit BlobStorageLogWriter(BlobStorageLogPtr log_)
: log(std::move(log_))
{}
void addEvent(
BlobStorageLogElement::EventType event_type,
const String & bucket,
const String & remote_path,
const String & local_path,
size_t data_size,
const Aws::S3::S3Error * error,
BlobStorageLogElement::EvenTime time_now = {});
bool isInitialized() const { return log != nullptr; }
/// Optional context information
String disk_name;
String query_id;
String local_path;
static BlobStorageLogWriterPtr create(const String & disk_name = "");
private:
BlobStorageLogPtr log;
};
}
#endif

View File

@ -4,6 +4,7 @@
#include <Common/ProfileEvents.h>
#include <Common/typeid_cast.h>
#include <IO/S3/BlobStorageLogWriter.h>
#include <Interpreters/Context.h>
#include <IO/LimitSeekableReadBuffer.h>
#include <IO/S3/getObjectInfo.h>
@ -59,6 +60,7 @@ namespace
const std::optional<std::map<String, String>> & object_metadata_,
ThreadPoolCallbackRunner<void> schedule_,
bool for_disk_s3_,
BlobStorageLogWriterPtr blob_storage_log_,
const Poco::Logger * log_)
: client_ptr(client_ptr_)
, dest_bucket(dest_bucket_)
@ -68,6 +70,7 @@ namespace
, object_metadata(object_metadata_)
, schedule(schedule_)
, for_disk_s3(for_disk_s3_)
, blob_storage_log(blob_storage_log_)
, log(log_)
{
}
@ -83,6 +86,7 @@ namespace
const std::optional<std::map<String, String>> & object_metadata;
ThreadPoolCallbackRunner<void> schedule;
bool for_disk_s3;
BlobStorageLogWriterPtr blob_storage_log;
const Poco::Logger * log;
struct UploadPartTask
@ -132,6 +136,10 @@ namespace
ProfileEvents::increment(ProfileEvents::DiskS3CreateMultipartUpload);
auto outcome = client_ptr->CreateMultipartUpload(request);
if (blob_storage_log)
blob_storage_log->addEvent(BlobStorageLogElement::EventType::MultiPartUploadCreate,
dest_bucket, dest_key, /* local_path_ */ {}, /* data_size */ 0,
outcome.IsSuccess() ? nullptr : &outcome.GetError());
if (outcome.IsSuccess())
{
@ -178,6 +186,16 @@ namespace
auto outcome = client_ptr->CompleteMultipartUpload(request);
if (blob_storage_log)
blob_storage_log->addEvent(BlobStorageLogElement::EventType::MultiPartUploadComplete,
dest_bucket, dest_key, /* local_path_ */ {}, /* data_size */ 0,
outcome.IsSuccess() ? nullptr : &outcome.GetError());
if (blob_storage_log)
blob_storage_log->addEvent(BlobStorageLogElement::EventType::MultiPartUploadComplete,
dest_bucket, dest_key, /* local_path_ */ {}, /* data_size */ 0,
outcome.IsSuccess() ? nullptr : &outcome.GetError());
if (outcome.IsSuccess())
{
LOG_TRACE(log, "Multipart upload has completed. Bucket: {}, Key: {}, Upload_id: {}, Parts: {}", dest_bucket, dest_key, multipart_upload_id, part_tags.size());
@ -206,7 +224,12 @@ namespace
abort_request.SetBucket(dest_bucket);
abort_request.SetKey(dest_key);
abort_request.SetUploadId(multipart_upload_id);
client_ptr->AbortMultipartUpload(abort_request);
auto outcome = client_ptr->AbortMultipartUpload(abort_request);
if (blob_storage_log)
blob_storage_log->addEvent(BlobStorageLogElement::EventType::MultiPartUploadAbort,
dest_bucket, dest_key, /* local_path_ */ {}, /* data_size */ 0,
outcome.IsSuccess() ? nullptr : &outcome.GetError());
multipart_upload_aborted = true;
}
@ -435,8 +458,9 @@ namespace
const S3Settings::RequestSettings & request_settings_,
const std::optional<std::map<String, String>> & object_metadata_,
ThreadPoolCallbackRunner<void> schedule_,
bool for_disk_s3_)
: UploadHelper(client_ptr_, dest_bucket_, dest_key_, request_settings_, object_metadata_, schedule_, for_disk_s3_, &Poco::Logger::get("copyDataToS3File"))
bool for_disk_s3_,
BlobStorageLogWriterPtr blob_storage_log_)
: UploadHelper(client_ptr_, dest_bucket_, dest_key_, request_settings_, object_metadata_, schedule_, for_disk_s3_, blob_storage_log_, &Poco::Logger::get("copyDataToS3File"))
, create_read_buffer(create_read_buffer_)
, offset(offset_)
, size(size_)
@ -500,6 +524,10 @@ namespace
Stopwatch watch;
auto outcome = client_ptr->PutObject(request);
watch.stop();
if (blob_storage_log)
blob_storage_log->addEvent(BlobStorageLogElement::EventType::Upload,
dest_bucket, dest_key, /* local_path_ */ {}, size,
outcome.IsSuccess() ? nullptr : &outcome.GetError());
if (outcome.IsSuccess())
{
@ -581,6 +609,11 @@ namespace
ProfileEvents::increment(ProfileEvents::DiskS3UploadPart);
auto outcome = client_ptr->UploadPart(req);
if (blob_storage_log)
blob_storage_log->addEvent(BlobStorageLogElement::EventType::MultiPartUploadWrite,
dest_bucket, dest_key, /* local_path_ */ {}, size,
outcome.IsSuccess() ? nullptr : &outcome.GetError());
if (!outcome.IsSuccess())
{
abortMultipartUpload();
@ -608,8 +641,9 @@ namespace
const ReadSettings & read_settings_,
const std::optional<std::map<String, String>> & object_metadata_,
ThreadPoolCallbackRunner<void> schedule_,
bool for_disk_s3_)
: UploadHelper(client_ptr_, dest_bucket_, dest_key_, request_settings_, object_metadata_, schedule_, for_disk_s3_, &Poco::Logger::get("copyS3File"))
bool for_disk_s3_,
BlobStorageLogWriterPtr blob_storage_log_)
: UploadHelper(client_ptr_, dest_bucket_, dest_key_, request_settings_, object_metadata_, schedule_, for_disk_s3_, blob_storage_log_, &Poco::Logger::get("copyS3File"))
, src_bucket(src_bucket_)
, src_key(src_key_)
, offset(src_offset_)
@ -712,6 +746,7 @@ namespace
dest_bucket,
dest_key,
request_settings,
blob_storage_log,
object_metadata,
schedule,
for_disk_s3);
@ -803,11 +838,12 @@ void copyDataToS3File(
const String & dest_bucket,
const String & dest_key,
const S3Settings::RequestSettings & settings,
BlobStorageLogWriterPtr blob_storage_log,
const std::optional<std::map<String, String>> & object_metadata,
ThreadPoolCallbackRunner<void> schedule,
bool for_disk_s3)
{
CopyDataToFileHelper helper{create_read_buffer, offset, size, dest_s3_client, dest_bucket, dest_key, settings, object_metadata, schedule, for_disk_s3};
CopyDataToFileHelper helper{create_read_buffer, offset, size, dest_s3_client, dest_bucket, dest_key, settings, object_metadata, schedule, for_disk_s3, blob_storage_log};
helper.performCopy();
}
@ -822,13 +858,14 @@ void copyS3File(
const String & dest_key,
const S3Settings::RequestSettings & settings,
const ReadSettings & read_settings,
BlobStorageLogWriterPtr blob_storage_log,
const std::optional<std::map<String, String>> & object_metadata,
ThreadPoolCallbackRunner<void> schedule,
bool for_disk_s3)
{
if (settings.allow_native_copy)
{
CopyFileHelper helper{s3_client, src_bucket, src_key, src_offset, src_size, dest_bucket, dest_key, settings, read_settings, object_metadata, schedule, for_disk_s3};
CopyFileHelper helper{s3_client, src_bucket, src_key, src_offset, src_size, dest_bucket, dest_key, settings, read_settings, object_metadata, schedule, for_disk_s3, blob_storage_log};
helper.performCopy();
}
else
@ -837,7 +874,7 @@ void copyS3File(
{
return std::make_unique<ReadBufferFromS3>(s3_client, src_bucket, src_key, "", settings, read_settings);
};
copyDataToS3File(create_read_buffer, src_offset, src_size, s3_client, dest_bucket, dest_key, settings, object_metadata, schedule, for_disk_s3);
copyDataToS3File(create_read_buffer, src_offset, src_size, s3_client, dest_bucket, dest_key, settings, blob_storage_log, object_metadata, schedule, for_disk_s3);
}
}

View File

@ -6,6 +6,7 @@
#include <Storages/StorageS3Settings.h>
#include <Interpreters/threadPoolCallbackRunner.h>
#include <IO/S3/BlobStorageLogWriter.h>
#include <base/types.h>
#include <functional>
#include <memory>
@ -38,6 +39,7 @@ void copyS3File(
const String & dest_key,
const S3Settings::RequestSettings & settings,
const ReadSettings & read_settings,
BlobStorageLogWriterPtr blob_storage_log,
const std::optional<std::map<String, String>> & object_metadata = std::nullopt,
ThreadPoolCallbackRunner<void> schedule_ = {},
bool for_disk_s3 = false);
@ -55,6 +57,7 @@ void copyDataToS3File(
const String & dest_bucket,
const String & dest_key,
const S3Settings::RequestSettings & settings,
BlobStorageLogWriterPtr blob_storage_log,
const std::optional<std::map<String, String>> & object_metadata = std::nullopt,
ThreadPoolCallbackRunner<void> schedule_ = {},
bool for_disk_s3 = false);

View File

@ -95,7 +95,8 @@ void doWriteRequest(std::shared_ptr<const DB::S3::Client> client, const DB::S3::
uri.bucket,
uri.key,
DBMS_DEFAULT_BUFFER_SIZE,
request_settings
request_settings,
{}
);
write_buffer.write('\0'); // doesn't matter what we write here, just needs to be something

View File

@ -34,7 +34,12 @@ public:
void set(Position ptr, size_t size) { BufferBase::set(ptr, size, 0); }
/** write the data in the buffer (from the beginning of the buffer to the current position);
* set the position to the beginning; throw an exception, if something is wrong
* set the position to the beginning; throw an exception, if something is wrong.
*
* Next call doesn't guarantee that buffer capacity is regained after.
* Some buffers (i.g WriteBufferFromS3) flush its data only after certain amount of consumed data.
* If direct write is performed into [position(), buffer().end()) and its length is not enough,
* you need to fill it first (i.g with write call), after it the capacity is regained.
*/
inline void next()
{

View File

@ -16,7 +16,7 @@
#include <IO/S3Common.h>
#include <IO/S3/Requests.h>
#include <IO/S3/getObjectInfo.h>
#include <Interpreters/Context.h>
#include <IO/S3/BlobStorageLogWriter.h>
#include <aws/s3/model/StorageClass.h>
@ -81,6 +81,7 @@ WriteBufferFromS3::WriteBufferFromS3(
const String & key_,
size_t buf_size_,
const S3Settings::RequestSettings & request_settings_,
BlobStorageLogWriterPtr blob_log_,
std::optional<std::map<String, String>> object_metadata_,
ThreadPoolCallbackRunner<void> schedule_,
const WriteSettings & write_settings_)
@ -98,6 +99,7 @@ WriteBufferFromS3::WriteBufferFromS3(
std::move(schedule_),
upload_settings.max_inflight_parts_for_one_file,
limitedLog))
, blob_log(std::move(blob_log_))
{
LOG_TRACE(limitedLog, "Create WriteBufferFromS3, {}", getShortLogDetails());
@ -378,6 +380,9 @@ void WriteBufferFromS3::createMultipartUpload()
watch.stop();
ProfileEvents::increment(ProfileEvents::WriteBufferFromS3Microseconds, watch.elapsedMicroseconds());
if (blob_log)
blob_log->addEvent(BlobStorageLogElement::EventType::MultiPartUploadCreate, bucket, key, {}, 0,
outcome.IsSuccess() ? nullptr : &outcome.GetError());
if (!outcome.IsSuccess())
{
@ -386,6 +391,7 @@ void WriteBufferFromS3::createMultipartUpload()
}
multipart_upload_id = outcome.GetResult().GetUploadId();
LOG_TRACE(limitedLog, "Multipart upload has created. {}", getShortLogDetails());
}
@ -414,6 +420,10 @@ void WriteBufferFromS3::abortMultipartUpload()
ProfileEvents::increment(ProfileEvents::WriteBufferFromS3Microseconds, watch.elapsedMicroseconds());
if (blob_log)
blob_log->addEvent(BlobStorageLogElement::EventType::MultiPartUploadAbort, bucket, key, {}, 0,
outcome.IsSuccess() ? nullptr : &outcome.GetError());
if (!outcome.IsSuccess())
{
ProfileEvents::increment(ProfileEvents::WriteBufferFromS3RequestsErrors, 1);
@ -508,6 +518,13 @@ void WriteBufferFromS3::writePart(WriteBufferFromS3::PartData && data)
ProfileEvents::increment(ProfileEvents::WriteBufferFromS3Microseconds, watch.elapsedMicroseconds());
if (blob_log)
{
blob_log->addEvent(BlobStorageLogElement::EventType::MultiPartUploadWrite,
/* bucket = */ bucket, /* remote_path = */ key, /* local_path = */ {}, /* data_size */ data_size,
outcome.IsSuccess() ? nullptr : &outcome.GetError());
}
if (!outcome.IsSuccess())
{
ProfileEvents::increment(ProfileEvents::WriteBufferFromS3RequestsErrors, 1);
@ -569,6 +586,10 @@ void WriteBufferFromS3::completeMultipartUpload()
ProfileEvents::increment(ProfileEvents::WriteBufferFromS3Microseconds, watch.elapsedMicroseconds());
if (blob_log)
blob_log->addEvent(BlobStorageLogElement::EventType::MultiPartUploadComplete, bucket, key, {}, 0,
outcome.IsSuccess() ? nullptr : &outcome.GetError());
if (outcome.IsSuccess())
{
LOG_TRACE(limitedLog, "Multipart upload has completed. {}, Parts: {}", getShortLogDetails(), multipart_tags.size());
@ -650,6 +671,9 @@ void WriteBufferFromS3::makeSinglepartUpload(WriteBufferFromS3::PartData && data
rlock.unlock();
ProfileEvents::increment(ProfileEvents::WriteBufferFromS3Microseconds, watch.elapsedMicroseconds());
if (blob_log)
blob_log->addEvent(BlobStorageLogElement::EventType::Upload, bucket, key, {}, request.GetContentLength(),
outcome.IsSuccess() ? nullptr : &outcome.GetError());
if (outcome.IsSuccess())
{

View File

@ -11,6 +11,7 @@
#include <IO/WriteSettings.h>
#include <Storages/StorageS3Settings.h>
#include <Interpreters/threadPoolCallbackRunner.h>
#include <IO/S3/BlobStorageLogWriter.h>
#include <memory>
#include <vector>
@ -34,6 +35,7 @@ public:
const String & key_,
size_t buf_size_,
const S3Settings::RequestSettings & request_settings_,
BlobStorageLogWriterPtr blob_log_,
std::optional<std::map<String, String>> object_metadata_ = std::nullopt,
ThreadPoolCallbackRunner<void> schedule_ = {},
const WriteSettings & write_settings_ = {});
@ -118,6 +120,8 @@ private:
class TaskTracker;
std::unique_ptr<TaskTracker> task_tracker;
BlobStorageLogWriterPtr blob_log;
};
}

View File

@ -554,6 +554,7 @@ public:
file_name,
DBMS_DEFAULT_BUFFER_SIZE,
request_settings,
nullptr,
std::nullopt,
getAsyncPolicy().getScheduler());
}
@ -1214,7 +1215,7 @@ TEST_F(WBS3Test, ReadBeyondLastOffset) {
/// create encrypted file reader
auto cache_log = std::shared_ptr<FilesystemCacheLog>();
const StoredObjects objects = { StoredObject(remote_file, data.size() + FileEncryption::Header::kSize) };
const StoredObjects objects = { StoredObject(remote_file, /* local_path */ "", data.size() + FileEncryption::Header::kSize) };
auto async_read_counters = std::make_shared<AsyncReadCounters>();
auto prefetch_log = std::shared_ptr<FilesystemReadPrefetchesLog>();

View File

@ -0,0 +1,92 @@
#include <Interpreters/BlobStorageLog.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeEnum.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeDateTime64.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <DataTypes/DataTypeDate.h>
namespace DB
{
NamesAndTypesList BlobStorageLogElement::getNamesAndTypes()
{
auto event_enum_type = std::make_shared<DataTypeEnum8>(
DataTypeEnum8::Values{
{"Upload", static_cast<Int8>(EventType::Upload)},
{"Delete", static_cast<Int8>(EventType::Delete)},
{"MultiPartUploadCreate", static_cast<Int8>(EventType::MultiPartUploadCreate)},
{"MultiPartUploadWrite", static_cast<Int8>(EventType::MultiPartUploadWrite)},
{"MultiPartUploadComplete", static_cast<Int8>(EventType::MultiPartUploadComplete)},
{"MultiPartUploadAbort", static_cast<Int8>(EventType::MultiPartUploadAbort)},
});
return {
{"event_date", std::make_shared<DataTypeDate>()},
{"event_time", std::make_shared<DataTypeDateTime>()},
{"event_time_microseconds", std::make_shared<DataTypeDateTime64>(6)},
{"event_type", event_enum_type},
{"query_id", std::make_shared<DataTypeString>()},
{"thread_id", std::make_shared<DataTypeUInt64>()},
{"thread_name", std::make_shared<DataTypeString>()},
{"disk_name", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())},
{"bucket", std::make_shared<DataTypeString>()},
{"remote_path", std::make_shared<DataTypeString>()},
{"local_path", std::make_shared<DataTypeString>()},
{"data_size", std::make_shared<DataTypeUInt64>()},
{"error", std::make_shared<DataTypeString>()},
};
}
void BlobStorageLogElement::appendToBlock(MutableColumns & columns) const
{
#ifndef NDEBUG
auto coulumn_names = BlobStorageLogElement::getNamesAndTypes().getNames();
#endif
size_t i = 0;
auto event_time_seconds = timeInSeconds(event_time);
assert(coulumn_names.at(i) == "event_date");
columns[i++]->insert(DateLUT::instance().toDayNum(event_time_seconds).toUnderType());
assert(coulumn_names.at(i) == "event_time");
columns[i++]->insert(event_time_seconds);
assert(coulumn_names.at(i) == "event_time_microseconds");
columns[i++]->insert(Decimal64(timeInMicroseconds(event_time)));
assert(coulumn_names.at(i) == "event_type");
columns[i++]->insert(static_cast<Int8>(event_type));
assert(coulumn_names.at(i) == "query_id");
columns[i++]->insert(query_id);
assert(coulumn_names.at(i) == "thread_id");
columns[i++]->insert(thread_id);
assert(coulumn_names.at(i) == "thread_name");
columns[i++]->insert(thread_name);
assert(coulumn_names.at(i) == "disk_name");
columns[i++]->insert(disk_name);
assert(coulumn_names.at(i) == "bucket");
columns[i++]->insert(bucket);
assert(coulumn_names.at(i) == "remote_path");
columns[i++]->insert(remote_path);
assert(coulumn_names.at(i) == "local_path");
columns[i++]->insert(local_path);
assert(coulumn_names.at(i) == "data_size");
columns[i++]->insert(data_size);
assert(coulumn_names.at(i) == "error");
columns[i++]->insert(error_message);
assert(i == coulumn_names.size() && columns.size() == coulumn_names.size());
}
}

View File

@ -0,0 +1,57 @@
#pragma once
#include <Interpreters/SystemLog.h>
#include <Core/NamesAndTypes.h>
#include <Core/NamesAndAliases.h>
#include <Poco/Message.h>
#include <chrono>
namespace DB
{
struct BlobStorageLogElement
{
enum class EventType : Int8
{
Upload = 1,
Delete = 2,
MultiPartUploadCreate = 3,
MultiPartUploadWrite = 4,
MultiPartUploadComplete = 5,
MultiPartUploadAbort = 6,
};
EventType event_type;
String query_id;
UInt64 thread_id = 0;
String thread_name;
String disk_name;
String bucket;
String remote_path;
String local_path;
size_t data_size;
Int32 error_code = -1; /// negative if no error
String error_message;
using EvenTime = std::chrono::time_point<std::chrono::system_clock>;
EvenTime event_time;
static std::string name() { return "BlobStorageLog"; }
static NamesAndTypesList getNamesAndTypes();
static NamesAndAliases getNamesAndAliases() { return {}; }
void appendToBlock(MutableColumns & columns) const;
static const char * getCustomColumnList() { return nullptr; }
};
class BlobStorageLog : public SystemLog<BlobStorageLogElement>
{
using SystemLog<BlobStorageLogElement>::SystemLog;
};
}

View File

@ -147,7 +147,7 @@ CacheGuard::Lock FileCache::lockCache() const
return cache_guard.lock();
}
FileSegments FileCache::getImpl(const LockedKey & locked_key, const FileSegment::Range & range) const
FileSegments FileCache::getImpl(const LockedKey & locked_key, const FileSegment::Range & range, size_t file_segments_limit) const
{
/// Given range = [left, right] and non-overlapping ordered set of file segments,
/// find list [segment1, ..., segmentN] of segments which intersect with given range.
@ -165,6 +165,9 @@ FileSegments FileCache::getImpl(const LockedKey & locked_key, const FileSegment:
FileSegments result;
auto add_to_result = [&](const FileSegmentMetadata & file_segment_metadata)
{
if (file_segments_limit && result.size() == file_segments_limit)
return false;
FileSegmentPtr file_segment;
if (!file_segment_metadata.evicting())
{
@ -180,6 +183,7 @@ FileSegments FileCache::getImpl(const LockedKey & locked_key, const FileSegment:
}
result.push_back(file_segment);
return true;
};
const auto & file_segments = locked_key;
@ -197,7 +201,8 @@ FileSegments FileCache::getImpl(const LockedKey & locked_key, const FileSegment:
if (file_segment_metadata.file_segment->range().right < range.left)
return {};
add_to_result(file_segment_metadata);
if (!add_to_result(file_segment_metadata))
return result;
}
else /// segment_it <-- segmment{k}
{
@ -213,7 +218,8 @@ FileSegments FileCache::getImpl(const LockedKey & locked_key, const FileSegment:
/// [___________
/// ^
/// range.left
add_to_result(prev_file_segment_metadata);
if (!add_to_result(prev_file_segment_metadata))
return result;
}
}
@ -229,7 +235,9 @@ FileSegments FileCache::getImpl(const LockedKey & locked_key, const FileSegment:
if (range.right < file_segment_metadata.file_segment->range().left)
break;
add_to_result(file_segment_metadata);
if (!add_to_result(file_segment_metadata))
return result;
++segment_it;
}
}
@ -237,11 +245,34 @@ FileSegments FileCache::getImpl(const LockedKey & locked_key, const FileSegment:
return result;
}
std::vector<FileSegment::Range> FileCache::splitRange(size_t offset, size_t size)
{
assert(size > 0);
std::vector<FileSegment::Range> ranges;
size_t current_pos = offset;
size_t end_pos_non_included = offset + size;
size_t remaining_size = size;
FileSegments file_segments;
while (current_pos < end_pos_non_included)
{
auto current_file_segment_size = std::min(remaining_size, max_file_segment_size);
ranges.emplace_back(current_pos, current_pos + current_file_segment_size - 1);
remaining_size -= current_file_segment_size;
current_pos += current_file_segment_size;
}
return ranges;
}
FileSegments FileCache::splitRangeIntoFileSegments(
LockedKey & locked_key,
size_t offset,
size_t size,
FileSegment::State state,
size_t file_segments_limit,
const CreateFileSegmentSettings & settings)
{
assert(size > 0);
@ -253,7 +284,7 @@ FileSegments FileCache::splitRangeIntoFileSegments(
size_t remaining_size = size;
FileSegments file_segments;
while (current_pos < end_pos_non_included)
while (current_pos < end_pos_non_included && (!file_segments_limit || file_segments.size() < file_segments_limit))
{
current_file_segment_size = std::min(remaining_size, max_file_segment_size);
remaining_size -= current_file_segment_size;
@ -265,7 +296,6 @@ FileSegments FileCache::splitRangeIntoFileSegments(
current_pos += current_file_segment_size;
}
assert(file_segments.empty() || offset + size - 1 == file_segments.back()->range().right);
return file_segments;
}
@ -273,6 +303,7 @@ void FileCache::fillHolesWithEmptyFileSegments(
LockedKey & locked_key,
FileSegments & file_segments,
const FileSegment::Range & range,
size_t file_segments_limit,
bool fill_with_detached_file_segments,
const CreateFileSegmentSettings & settings)
{
@ -289,6 +320,7 @@ void FileCache::fillHolesWithEmptyFileSegments(
assert(!file_segments.empty());
auto it = file_segments.begin();
size_t processed_count = 0;
auto segment_range = (*it)->range();
size_t current_pos;
@ -301,11 +333,17 @@ void FileCache::fillHolesWithEmptyFileSegments(
current_pos = segment_range.right + 1;
++it;
++processed_count;
}
else
current_pos = range.left;
while (current_pos <= range.right && it != file_segments.end())
auto is_limit_reached = [&]() -> bool
{
return file_segments_limit && processed_count >= file_segments_limit;
};
while (current_pos <= range.right && it != file_segments.end() && !is_limit_reached())
{
segment_range = (*it)->range();
@ -313,6 +351,7 @@ void FileCache::fillHolesWithEmptyFileSegments(
{
current_pos = segment_range.right + 1;
++it;
++processed_count;
continue;
}
@ -326,18 +365,47 @@ void FileCache::fillHolesWithEmptyFileSegments(
locked_key.getKey(), current_pos, hole_size, FileSegment::State::DETACHED, settings);
file_segments.insert(it, file_segment);
++processed_count;
}
else
{
auto split = splitRangeIntoFileSegments(
locked_key, current_pos, hole_size, FileSegment::State::EMPTY, settings);
file_segments.splice(it, std::move(split));
auto ranges = splitRange(current_pos, hole_size);
FileSegments hole;
for (const auto & r : ranges)
{
auto metadata_it = addFileSegment(locked_key, r.left, r.size(), FileSegment::State::EMPTY, settings, nullptr);
hole.push_back(metadata_it->second->file_segment);
++processed_count;
if (is_limit_reached())
break;
}
file_segments.splice(it, std::move(hole));
}
if (is_limit_reached())
break;
current_pos = segment_range.right + 1;
++it;
++processed_count;
}
auto erase_unprocessed = [&]()
{
chassert(file_segments.size() >= file_segments_limit);
file_segments.erase(it, file_segments.end());
chassert(file_segments.size() == file_segments_limit);
};
if (is_limit_reached())
{
erase_unprocessed();
return;
}
chassert(!file_segments_limit || file_segments.size() < file_segments_limit);
if (current_pos <= range.right)
{
/// ________] -- requested range
@ -356,9 +424,21 @@ void FileCache::fillHolesWithEmptyFileSegments(
}
else
{
auto split = splitRangeIntoFileSegments(
locked_key, current_pos, hole_size, FileSegment::State::EMPTY, settings);
file_segments.splice(file_segments.end(), std::move(split));
auto ranges = splitRange(current_pos, hole_size);
FileSegments hole;
for (const auto & r : ranges)
{
auto metadata_it = addFileSegment(locked_key, r.left, r.size(), FileSegment::State::EMPTY, settings, nullptr);
hole.push_back(metadata_it->second->file_segment);
++processed_count;
if (is_limit_reached())
break;
}
file_segments.splice(it, std::move(hole));
if (is_limit_reached())
erase_unprocessed();
}
}
}
@ -374,7 +454,7 @@ FileSegmentsHolderPtr FileCache::set(
auto locked_key = metadata.lockKeyMetadata(key, CacheMetadata::KeyNotFoundPolicy::CREATE_EMPTY);
FileSegment::Range range(offset, offset + size - 1);
auto file_segments = getImpl(*locked_key, range);
auto file_segments = getImpl(*locked_key, range, /* file_segments_limit */0);
if (!file_segments.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Having intersection with already existing cache");
@ -388,7 +468,7 @@ FileSegmentsHolderPtr FileCache::set(
else
{
file_segments = splitRangeIntoFileSegments(
*locked_key, offset, size, FileSegment::State::EMPTY, settings);
*locked_key, offset, size, FileSegment::State::EMPTY, /* file_segments_limit */0, settings);
}
return std::make_unique<FileSegmentsHolder>(std::move(file_segments));
@ -400,43 +480,137 @@ FileCache::getOrSet(
size_t offset,
size_t size,
size_t file_size,
const CreateFileSegmentSettings & settings)
const CreateFileSegmentSettings & settings,
size_t file_segments_limit)
{
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::FilesystemCacheGetOrSetMicroseconds);
assertInitialized();
const auto aligned_offset = roundDownToMultiple(offset, boundary_alignment);
const auto aligned_end = std::min(roundUpToMultiple(offset + size, boundary_alignment), file_size);
const auto aligned_size = aligned_end - aligned_offset;
FileSegment::Range range(offset, offset + size - 1);
FileSegment::Range range(aligned_offset, aligned_offset + aligned_size - 1);
const auto aligned_offset = roundDownToMultiple(range.left, boundary_alignment);
auto aligned_end_offset = std::min(roundUpToMultiple(offset + size, boundary_alignment), file_size) - 1;
chassert(aligned_offset <= range.left);
chassert(aligned_end_offset >= range.right);
auto locked_key = metadata.lockKeyMetadata(key, CacheMetadata::KeyNotFoundPolicy::CREATE_EMPTY);
/// Get all segments which intersect with the given range.
auto file_segments = getImpl(*locked_key, range);
auto file_segments = getImpl(*locked_key, range, file_segments_limit);
if (file_segments_limit)
{
chassert(file_segments.size() <= file_segments_limit);
if (file_segments.size() == file_segments_limit)
range.right = aligned_end_offset = file_segments.back()->range().right;
}
/// Check case if we have uncovered prefix, e.g.
///
/// [_______________]
/// ^ ^
/// range.left range.right
/// [___] [__________] <-- current cache (example)
/// [ ]
/// ^----^
/// uncovered prefix.
const bool has_uncovered_prefix = file_segments.empty() || range.left < file_segments.front()->range().left;
if (aligned_offset < range.left && has_uncovered_prefix)
{
auto prefix_range = FileSegment::Range(aligned_offset, file_segments.empty() ? range.left - 1 : file_segments.front()->range().left - 1);
auto prefix_file_segments = getImpl(*locked_key, prefix_range, /* file_segments_limit */0);
if (prefix_file_segments.empty())
{
/// [____________________][_______________]
/// ^ ^ ^
/// aligned_offset range.left range.right
/// [___] [__________] <-- current cache (example)
range.left = aligned_offset;
}
else
{
/// [____________________][_______________]
/// ^ ^ ^
/// aligned_offset range.left range.right
/// ____] [____] [___] [__________] <-- current cache (example)
/// ^
/// prefix_file_segments.back().right
chassert(prefix_file_segments.back()->range().right < range.left);
chassert(prefix_file_segments.back()->range().right >= aligned_offset);
range.left = prefix_file_segments.back()->range().right + 1;
}
}
/// Check case if we have uncovered suffix.
///
/// [___________________]
/// ^ ^
/// range.left range.right
/// [___] [___] <-- current cache (example)
/// [___]
/// ^---^
/// uncovered_suffix
const bool has_uncovered_suffix = file_segments.empty() || file_segments.back()->range().right < range.right;
if (range.right < aligned_end_offset && has_uncovered_suffix)
{
auto suffix_range = FileSegment::Range(range.right, aligned_end_offset);
/// We need to get 1 file segment, so file_segments_limit = 1 here.
auto suffix_file_segments = getImpl(*locked_key, suffix_range, /* file_segments_limit */1);
if (suffix_file_segments.empty())
{
/// [__________________][ ]
/// ^ ^ ^
/// range.left range.right aligned_end_offset
/// [___] [___] <-- current cache (example)
range.right = aligned_end_offset;
}
else
{
/// [__________________][ ]
/// ^ ^ ^
/// range.left range.right aligned_end_offset
/// [___] [___] [_________] <-- current cache (example)
/// ^
/// suffix_file_segments.front().left
range.right = suffix_file_segments.front()->range().left - 1;
}
}
if (file_segments.empty())
{
file_segments = splitRangeIntoFileSegments(*locked_key, range.left, range.size(), FileSegment::State::EMPTY, settings);
file_segments = splitRangeIntoFileSegments(*locked_key, range.left, range.size(), FileSegment::State::EMPTY, file_segments_limit, settings);
}
else
{
chassert(file_segments.front()->range().right >= range.left);
chassert(file_segments.back()->range().left <= range.right);
fillHolesWithEmptyFileSegments(
*locked_key, file_segments, range, /* fill_with_detached */false, settings);
*locked_key, file_segments, range, file_segments_limit, /* fill_with_detached */false, settings);
if (!file_segments.front()->range().contains(offset))
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected {} to include {} "
"(end offset: {}, aligned offset: {}, aligned end offset: {})",
file_segments.front()->range().toString(), offset, range.right, aligned_offset, aligned_end_offset);
}
}
while (!file_segments.empty() && file_segments.front()->range().right < offset)
file_segments.pop_front();
chassert(file_segments_limit ? file_segments.back()->range().left <= range.right : file_segments.back()->range().contains(range.right));
chassert(!file_segments_limit || file_segments.size() <= file_segments_limit);
while (!file_segments.empty() && file_segments.back()->range().left >= offset + size)
file_segments.pop_back();
chassert(!file_segments.empty());
return std::make_unique<FileSegmentsHolder>(std::move(file_segments));
}
FileSegmentsHolderPtr FileCache::get(const Key & key, size_t offset, size_t size)
FileSegmentsHolderPtr FileCache::get(const Key & key, size_t offset, size_t size, size_t file_segments_limit)
{
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::FilesystemCacheGetMicroseconds);
@ -448,12 +622,20 @@ FileSegmentsHolderPtr FileCache::get(const Key & key, size_t offset, size_t size
FileSegment::Range range(offset, offset + size - 1);
/// Get all segments which intersect with the given range.
auto file_segments = getImpl(*locked_key, range);
auto file_segments = getImpl(*locked_key, range, file_segments_limit);
if (!file_segments.empty())
{
fillHolesWithEmptyFileSegments(
*locked_key, file_segments, range, /* fill_with_detached */true, CreateFileSegmentSettings{});
if (file_segments_limit)
{
chassert(file_segments.size() <= file_segments_limit);
if (file_segments.size() == file_segments_limit)
range.right = file_segments.back()->range().right;
}
fillHolesWithEmptyFileSegments(
*locked_key, file_segments, range, file_segments_limit, /* fill_with_detached */true, CreateFileSegmentSettings{});
chassert(!file_segments_limit || file_segments.size() <= file_segments_limit);
return std::make_unique<FileSegmentsHolder>(std::move(file_segments));
}
}
@ -801,6 +983,11 @@ void FileCache::removePathIfExists(const String & path)
void FileCache::removeAllReleasable()
{
assertInitialized();
#ifdef ABORT_ON_LOGICAL_ERROR
assertCacheCorrectness();
#endif
metadata.removeAllKeys(/* if_releasable */true);
if (stash)

View File

@ -80,8 +80,13 @@ public:
* As long as pointers to returned file segments are held
* it is guaranteed that these file segments are not removed from cache.
*/
FileSegmentsHolderPtr
getOrSet(const Key & key, size_t offset, size_t size, size_t file_size, const CreateFileSegmentSettings & settings);
FileSegmentsHolderPtr getOrSet(
const Key & key,
size_t offset,
size_t size,
size_t file_size,
const CreateFileSegmentSettings & settings,
size_t file_segments_limit = 0);
/**
* Segments in returned list are ordered in ascending order and represent a full contiguous
@ -92,7 +97,7 @@ public:
* with the destruction of the holder, while in getOrSet() EMPTY file segments can eventually change
* it's state (and become DOWNLOADED).
*/
FileSegmentsHolderPtr get(const Key & key, size_t offset, size_t size);
FileSegmentsHolderPtr get(const Key & key, size_t offset, size_t size, size_t file_segments_limit);
FileSegmentsHolderPtr set(const Key & key, size_t offset, size_t size, const CreateFileSegmentSettings & settings);
@ -204,26 +209,41 @@ private:
std::unique_ptr<ThreadFromGlobalPool> cleanup_thread;
void assertInitialized() const;
void assertCacheCorrectness();
void loadMetadata();
void loadMetadataImpl();
void loadMetadataForKeys(const std::filesystem::path & keys_dir);
FileSegments getImpl(const LockedKey & locked_key, const FileSegment::Range & range) const;
/// Get all file segments from cache which intersect with `range`.
/// If `file_segments_limit` > 0, return no more than first file_segments_limit
/// file segments.
FileSegments getImpl(
const LockedKey & locked_key,
const FileSegment::Range & range,
size_t file_segments_limit) const;
/// Split range into subranges by max_file_segment_size,
/// each subrange size must be less or equal to max_file_segment_size.
std::vector<FileSegment::Range> splitRange(size_t offset, size_t size);
/// Split range into subranges by max_file_segment_size (same as in splitRange())
/// and create a new file segment for each subrange.
/// If `file_segments_limit` > 0, create no more than first file_segments_limit
/// file segments.
FileSegments splitRangeIntoFileSegments(
LockedKey & locked_key,
size_t offset,
size_t size,
FileSegment::State state,
size_t file_segments_limit,
const CreateFileSegmentSettings & create_settings);
void fillHolesWithEmptyFileSegments(
LockedKey & locked_key,
FileSegments & file_segments,
const FileSegment::Range & range,
size_t file_segments_limit,
bool fill_with_detached_file_segments,
const CreateFileSegmentSettings & settings);

View File

@ -58,6 +58,9 @@ void FileCacheSettings::loadImpl(FuncHas has, FuncGetUInt get_uint, FuncGetStrin
if (has("load_metadata_threads"))
load_metadata_threads = get_uint("load_metadata_threads");
if (boundary_alignment > max_file_segment_size)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Setting `boundary_alignment` cannot exceed `max_file_segment_size`");
}
void FileCacheSettings::loadFromConfig(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix)

View File

@ -924,8 +924,8 @@ void FileSegment::use()
}
}
FileSegmentsHolder::FileSegmentsHolder(FileSegments && file_segments_, bool complete_on_dtor_)
: file_segments(std::move(file_segments_)), complete_on_dtor(complete_on_dtor_)
FileSegmentsHolder::FileSegmentsHolder(FileSegments && file_segments_)
: file_segments(std::move(file_segments_))
{
CurrentMetrics::add(CurrentMetrics::FilesystemCacheHoldFileSegments, file_segments.size());
ProfileEvents::increment(ProfileEvents::FilesystemCacheHoldFileSegments, file_segments.size());
@ -935,9 +935,6 @@ FileSegmentsHolder::~FileSegmentsHolder()
{
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::FileSegmentHolderCompleteMicroseconds);
if (!complete_on_dtor)
return;
ProfileEvents::increment(ProfileEvents::FilesystemCacheUnusedHoldFileSegments, file_segments.size());
for (auto file_segment_it = file_segments.begin(); file_segment_it != file_segments.end();)
file_segment_it = completeAndPopFrontImpl();

View File

@ -136,6 +136,8 @@ public:
size_t size() const { return right - left + 1; }
String toString() const { return fmt::format("[{}, {}]", std::to_string(left), std::to_string(right)); }
bool contains(size_t offset) const { return left <= offset && offset <= right; }
};
static String getCallerId();
@ -324,7 +326,7 @@ struct FileSegmentsHolder : private boost::noncopyable
{
FileSegmentsHolder() = default;
explicit FileSegmentsHolder(FileSegments && file_segments_, bool complete_on_dtor_ = true);
explicit FileSegmentsHolder(FileSegments && file_segments_);
~FileSegmentsHolder();
@ -350,7 +352,6 @@ struct FileSegmentsHolder : private boost::noncopyable
private:
FileSegments file_segments{};
const bool complete_on_dtor = true;
FileSegments::iterator completeAndPopFrontImpl();
};

View File

@ -3667,16 +3667,25 @@ std::shared_ptr<BackupLog> Context::getBackupLog() const
return shared->system_logs->backup_log;
}
std::shared_ptr<BlobStorageLog> Context::getBlobStorageLog() const
{
SharedLockGuard lock(shared->mutex);
if (!shared->system_logs)
return {};
return shared->system_logs->blob_storage_log;
}
std::vector<ISystemLog *> Context::getSystemLogs() const
{
SharedLockGuard lock(shared->mutex);
if (!shared->system_logs)
return {};
return shared->system_logs->logs;
}
CompressionCodecPtr Context::chooseCompressionCodec(size_t part_size, double part_size_ratio) const
{
std::lock_guard lock(shared->mutex);
@ -4804,6 +4813,7 @@ ReadSettings Context::getReadSettings() const
res.enable_filesystem_cache = settings.enable_filesystem_cache;
res.read_from_filesystem_cache_if_exists_otherwise_bypass_cache = settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache;
res.enable_filesystem_cache_log = settings.enable_filesystem_cache_log;
res.filesystem_cache_segments_batch_size = settings.filesystem_cache_segments_batch_size;
res.filesystem_cache_max_download_size = settings.filesystem_cache_max_download_size;
res.skip_download_if_exceeds_query_cache = settings.skip_download_if_exceeds_query_cache;

View File

@ -107,6 +107,7 @@ class FilesystemReadPrefetchesLog;
class S3QueueLog;
class AsynchronousInsertLog;
class BackupLog;
class BlobStorageLog;
class IAsynchronousReader;
struct MergeTreeSettings;
struct InitialAllRangesAnnouncement;
@ -1057,6 +1058,7 @@ public:
std::shared_ptr<FilesystemReadPrefetchesLog> getFilesystemReadPrefetchesLog() const;
std::shared_ptr<AsynchronousInsertLog> getAsynchronousInsertLog() const;
std::shared_ptr<BackupLog> getBackupLog() const;
std::shared_ptr<BlobStorageLog> getBlobStorageLog() const;
std::vector<ISystemLog *> getSystemLogs() const;

View File

@ -209,7 +209,7 @@ Block InterpreterSelectQueryAnalyzer::getSampleBlock(const QueryTreeNodePtr & qu
{
auto select_query_options_copy = select_query_options;
select_query_options_copy.only_analyze = true;
InterpreterSelectQueryAnalyzer interpreter(query_tree, context, select_query_options);
InterpreterSelectQueryAnalyzer interpreter(query_tree, context, select_query_options_copy);
return interpreter.getSampleBlock();
}

View File

@ -35,6 +35,7 @@
#include <Interpreters/ProcessorsProfileLog.h>
#include <Interpreters/AsynchronousInsertLog.h>
#include <Interpreters/BackupLog.h>
#include <IO/S3/BlobStorageLogWriter.h>
#include <Interpreters/JIT/CompiledExpressionCache.h>
#include <Interpreters/TransactionLog.h>
#include <Interpreters/AsynchronousInsertQueue.h>

View File

@ -10,6 +10,7 @@
#include <Interpreters/OpenTelemetrySpanLog.h>
#include <Interpreters/PartLog.h>
#include <Interpreters/ProcessorsProfileLog.h>
#include <Interpreters/BlobStorageLog.h>
#include <Interpreters/QueryLog.h>
#include <Interpreters/QueryThreadLog.h>
#include <Interpreters/QueryViewsLog.h>
@ -291,6 +292,7 @@ SystemLogs::SystemLogs(ContextPtr global_context, const Poco::Util::AbstractConf
asynchronous_insert_log = createSystemLog<AsynchronousInsertLog>(global_context, "system", "asynchronous_insert_log", config, "asynchronous_insert_log");
backup_log = createSystemLog<BackupLog>(global_context, "system", "backup_log", config, "backup_log");
s3_queue_log = createSystemLog<S3QueueLog>(global_context, "system", "s3queue_log", config, "s3queue_log");
blob_storage_log = createSystemLog<BlobStorageLog>(global_context, "system", "blob_storage_log", config, "blob_storage_log");
if (query_log)
logs.emplace_back(query_log.get());
@ -333,6 +335,8 @@ SystemLogs::SystemLogs(ContextPtr global_context, const Poco::Util::AbstractConf
logs.emplace_back(backup_log.get());
if (s3_queue_log)
logs.emplace_back(s3_queue_log.get());
if (blob_storage_log)
logs.emplace_back(blob_storage_log.get());
try
{

View File

@ -51,6 +51,7 @@ class FilesystemReadPrefetchesLog;
class AsynchronousInsertLog;
class BackupLog;
class S3QueueLog;
class BlobStorageLog;
/// System logs should be destroyed in destructor of the last Context and before tables,
/// because SystemLog destruction makes insert query while flushing data into underlying tables
@ -89,6 +90,8 @@ struct SystemLogs
std::shared_ptr<AsynchronousInsertLog> asynchronous_insert_log;
/// Backup and restore events
std::shared_ptr<BackupLog> backup_log;
/// Log blob storage operations
std::shared_ptr<BlobStorageLog> blob_storage_log;
std::vector<ISystemLog *> logs;
};

View File

@ -734,6 +734,12 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
bool is_create_parameterized_view = false;
if (const auto * create_query = ast->as<ASTCreateQuery>())
is_create_parameterized_view = create_query->isParameterizedView();
else if (const auto * explain_query = ast->as<ASTExplainQuery>())
{
assert(!explain_query->children.empty());
if (const auto * create_of_explain_query = explain_query->children[0]->as<ASTCreateQuery>())
is_create_parameterized_view = create_of_explain_query->isParameterizedView();
}
/// Replace ASTQueryParameter with ASTLiteral for prepared statements.
/// Even if we don't have parameters in query_context, check that AST doesn't have unknown parameters

View File

@ -242,7 +242,7 @@ TEST_F(FileCacheTest, get)
settings.max_elements = 5;
settings.boundary_alignment = 1;
const size_t file_size = -1; // the value doesn't really matter because boundary_alignment == 1.
const size_t file_size = INT_MAX; // the value doesn't really matter because boundary_alignment == 1.
{
std::cerr << "Step 1\n";

View File

@ -116,7 +116,7 @@ namespace
void checkStoragesSupportTransactions(const PlannerContextPtr & planner_context)
{
const auto & query_context = planner_context->getQueryContext();
if (query_context->getSettingsRef().throw_on_unsupported_query_inside_transaction)
if (!query_context->getSettingsRef().throw_on_unsupported_query_inside_transaction)
return;
if (!query_context->getCurrentTransaction())
@ -130,13 +130,11 @@ void checkStoragesSupportTransactions(const PlannerContextPtr & planner_context)
else if (auto * table_function_node = table_expression->as<TableFunctionNode>())
storage = table_function_node->getStorage();
if (storage->supportsTransactions())
continue;
throw Exception(ErrorCodes::NOT_IMPLEMENTED,
"Storage {} (table {}) does not support transactions",
storage->getName(),
storage->getStorageID().getNameForLogs());
if (storage && !storage->supportsTransactions())
throw Exception(ErrorCodes::NOT_IMPLEMENTED,
"Storage {} (table {}) does not support transactions",
storage->getName(),
storage->getStorageID().getNameForLogs());
}
}
@ -1334,9 +1332,9 @@ void Planner::buildPlanForQueryNode()
query_node.getHaving() = {};
}
checkStoragesSupportTransactions(planner_context);
collectSets(query_tree, *planner_context);
collectTableExpressionData(query_tree, planner_context);
checkStoragesSupportTransactions(planner_context);
if (!select_query_options.only_analyze)
collectFiltersForAnalysis(query_tree, planner_context);

View File

@ -35,6 +35,17 @@ RemoteSource::RemoteSource(RemoteQueryExecutorPtr executor, bool add_aggregation
addTotalBytes(value.total_bytes_to_read);
progress(value.read_rows, value.read_bytes);
});
query_executor->setProfileInfoCallback([this](const ProfileInfo & info)
{
if (rows_before_limit)
{
if (info.hasAppliedLimit())
rows_before_limit->add(info.getRowsBeforeLimit());
else
manually_add_rows_before_limit_counter = true; /// Remote subquery doesn't contain a limit
}
});
}
RemoteSource::~RemoteSource() = default;
@ -52,7 +63,7 @@ void RemoteSource::setStorageLimits(const std::shared_ptr<const StorageLimitsLis
ISource::Status RemoteSource::prepare()
{
/// Check if query was cancelled before returning Async status. Otherwise it may lead to infinite loop.
if (was_query_canceled)
if (isCancelled())
{
getPort().finish();
return Status::Finished;
@ -77,23 +88,11 @@ ISource::Status RemoteSource::prepare()
std::optional<Chunk> RemoteSource::tryGenerate()
{
/// onCancel() will do the cancel if the query was sent.
if (was_query_canceled)
if (isCancelled())
return {};
if (!was_query_sent)
{
/// Get rows_before_limit result for remote query from ProfileInfo packet.
query_executor->setProfileInfoCallback([this](const ProfileInfo & info)
{
if (rows_before_limit)
{
if (info.hasAppliedLimit())
rows_before_limit->add(info.getRowsBeforeLimit());
else
manually_add_rows_before_limit_counter = true; /// Remote subquery doesn't contain a limit
}
});
if (async_query_sending)
{
int fd_ = query_executor->sendQueryAsync();
@ -169,7 +168,6 @@ std::optional<Chunk> RemoteSource::tryGenerate()
void RemoteSource::onCancel()
{
was_query_canceled = true;
query_executor->cancel();
}
@ -177,7 +175,6 @@ void RemoteSource::onUpdatePorts()
{
if (getPort().isFinished())
{
was_query_canceled = true;
query_executor->finish();
}
}

View File

@ -39,7 +39,6 @@ protected:
void onCancel() override;
private:
std::atomic<bool> was_query_canceled = false;
bool was_query_sent = false;
bool add_aggregation_info = false;
RemoteQueryExecutorPtr query_executor;

View File

@ -771,10 +771,16 @@ bool RemoteQueryExecutor::hasThrownException() const
void RemoteQueryExecutor::setProgressCallback(ProgressCallback callback)
{
std::lock_guard guard(was_cancelled_mutex);
progress_callback = std::move(callback);
if (extension && extension->parallel_reading_coordinator)
extension->parallel_reading_coordinator->setProgressCallback(progress_callback);
}
void RemoteQueryExecutor::setProfileInfoCallback(ProfileInfoCallback callback)
{
std::lock_guard guard(was_cancelled_mutex);
profile_info_callback = std::move(callback);
}
}

View File

@ -168,7 +168,7 @@ public:
void setProgressCallback(ProgressCallback callback);
/// Set callback for profile info. It will be called on ProfileInfo packet.
void setProfileInfoCallback(ProfileInfoCallback callback) { profile_info_callback = std::move(callback); }
void setProfileInfoCallback(ProfileInfoCallback callback);
/// Set the query_id. For now, used by performance test to later find the query
/// in the server query_log. Must be called before sending the query to the server.

View File

@ -84,6 +84,7 @@ struct Settings;
M(UInt64, min_delay_to_insert_ms, 10, "Min delay of inserting data into MergeTree table in milliseconds, if there are a lot of unmerged parts in single partition.", 0) \
M(UInt64, max_parts_in_total, 100000, "If more than this number active parts in all partitions in total, throw 'Too many parts ...' exception.", 0) \
M(Bool, async_insert, false, "If true, data from INSERT query is stored in queue and later flushed to table in background.", 0) \
M(Bool, add_implicit_sign_column_constraint_for_collapsing_engine, false, "If true, add implicit constraint for sign column for CollapsingMergeTree engine.", 0) \
M(Milliseconds, sleep_before_commit_local_part_in_replicated_table_ms, 0, "For testing. Do not change it.", 0) \
\
/* Part removal settings. */ \

View File

@ -598,11 +598,6 @@ static StoragePtr create(const StorageFactory::Arguments & args)
metadata.projections.add(std::move(projection));
}
auto constraints = metadata.constraints.getConstraints();
if (args.query.columns_list && args.query.columns_list->constraints)
for (auto & constraint : args.query.columns_list->constraints->children)
constraints.push_back(constraint);
metadata.constraints = ConstraintsDescription(constraints);
auto column_ttl_asts = columns.getColumnTTLs();
for (const auto & [name, ast] : column_ttl_asts)
@ -620,6 +615,30 @@ static StoragePtr create(const StorageFactory::Arguments & args)
args.getLocalContext()->checkMergeTreeSettingsConstraints(initial_storage_settings, storage_settings->changes());
metadata.settings_changes = args.storage_def->settings->ptr();
}
auto constraints = metadata.constraints.getConstraints();
if (args.query.columns_list && args.query.columns_list->constraints)
for (auto & constraint : args.query.columns_list->constraints->children)
constraints.push_back(constraint);
if ((merging_params.mode == MergeTreeData::MergingParams::Collapsing ||
merging_params.mode == MergeTreeData::MergingParams::VersionedCollapsing) &&
storage_settings->add_implicit_sign_column_constraint_for_collapsing_engine)
{
auto sign_column_check_constraint = std::make_unique<ASTConstraintDeclaration>();
sign_column_check_constraint->name = "check_sign_column";
sign_column_check_constraint->type = ASTConstraintDeclaration::Type::CHECK;
Array valid_values_array;
valid_values_array.emplace_back(-1);
valid_values_array.emplace_back(1);
auto valid_values_ast = std::make_unique<ASTLiteral>(std::move(valid_values_array));
auto sign_column_ast = std::make_unique<ASTIdentifier>(merging_params.sign_column);
sign_column_check_constraint->set(sign_column_check_constraint->expr, makeASTFunction("in", std::move(sign_column_ast), std::move(valid_values_ast)));
constraints.push_back(std::move(sign_column_check_constraint));
}
metadata.constraints = ConstraintsDescription(constraints);
}
else
{

View File

@ -43,6 +43,7 @@ void S3QueueMetadataFactory::remove(const std::string & zookeeper_path)
if (it == metadata_by_path.end())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Metadata with zookeeper path {} does not exist", zookeeper_path);
chassert(it->second.ref_count > 0);
if (--it->second.ref_count == 0)
{
try

Some files were not shown because too many files have changed in this diff Show More