mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-26 17:41:59 +00:00
Merge branch 'master' into tavplubix-patch-8
This commit is contained in:
commit
abb80d45f0
@ -161,5 +161,9 @@
|
||||
"docker/test/sqllogic": {
|
||||
"name": "clickhouse/sqllogic-test",
|
||||
"dependent": []
|
||||
},
|
||||
"docker/test/integration/nginx_dav": {
|
||||
"name": "clickhouse/nginx-dav",
|
||||
"dependent": []
|
||||
}
|
||||
}
|
||||
|
@ -32,7 +32,7 @@ ENV MSAN_OPTIONS='abort_on_error=1 poison_in_dtor=1'
|
||||
RUN echo "en_US.UTF-8 UTF-8" > /etc/locale.gen && locale-gen en_US.UTF-8
|
||||
ENV LC_ALL en_US.UTF-8
|
||||
|
||||
ENV TZ=Europe/Moscow
|
||||
ENV TZ=Europe/Amsterdam
|
||||
RUN ln -snf "/usr/share/zoneinfo/$TZ" /etc/localtime && echo "$TZ" > /etc/timezone
|
||||
|
||||
CMD sleep 1
|
||||
|
@ -32,7 +32,7 @@ RUN mkdir -p /tmp/clickhouse-odbc-tmp \
|
||||
&& odbcinst -i -s -l -f /tmp/clickhouse-odbc-tmp/share/doc/clickhouse-odbc/config/odbc.ini.sample \
|
||||
&& rm -rf /tmp/clickhouse-odbc-tmp
|
||||
|
||||
ENV TZ=Europe/Moscow
|
||||
ENV TZ=Europe/Amsterdam
|
||||
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
|
||||
|
||||
ENV COMMIT_SHA=''
|
||||
|
@ -8,7 +8,7 @@ ARG apt_archive="http://archive.ubuntu.com"
|
||||
RUN sed -i "s|http://archive.ubuntu.com|$apt_archive|g" /etc/apt/sources.list
|
||||
|
||||
ENV LANG=C.UTF-8
|
||||
ENV TZ=Europe/Moscow
|
||||
ENV TZ=Europe/Amsterdam
|
||||
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
|
||||
|
||||
RUN apt-get update \
|
||||
|
6
docker/test/integration/nginx_dav/Dockerfile
Normal file
6
docker/test/integration/nginx_dav/Dockerfile
Normal file
@ -0,0 +1,6 @@
|
||||
FROM nginx:alpine-slim
|
||||
|
||||
COPY default.conf /etc/nginx/conf.d/
|
||||
|
||||
RUN mkdir /usr/share/nginx/files/ \
|
||||
&& chown nginx: /usr/share/nginx/files/ -R
|
25
docker/test/integration/nginx_dav/default.conf
Normal file
25
docker/test/integration/nginx_dav/default.conf
Normal file
@ -0,0 +1,25 @@
|
||||
server {
|
||||
listen 80;
|
||||
|
||||
#root /usr/share/nginx/test.com;
|
||||
index index.html index.htm;
|
||||
|
||||
server_name test.com localhost;
|
||||
|
||||
location / {
|
||||
expires max;
|
||||
root /usr/share/nginx/files;
|
||||
client_max_body_size 20m;
|
||||
client_body_temp_path /usr/share/nginx/tmp;
|
||||
dav_methods PUT; # Allowed methods, only PUT is necessary
|
||||
|
||||
create_full_put_path on; # nginx automatically creates nested directories
|
||||
dav_access user:rw group:r all:r; # access permissions for files
|
||||
|
||||
limit_except GET {
|
||||
allow all;
|
||||
}
|
||||
}
|
||||
|
||||
error_page 405 =200 $uri;
|
||||
}
|
@ -1,16 +1,15 @@
|
||||
version: '2.3'
|
||||
services:
|
||||
meili1:
|
||||
image: getmeili/meilisearch:v0.27.0
|
||||
image: getmeili/meilisearch:v0.27.0
|
||||
restart: always
|
||||
ports:
|
||||
- ${MEILI_EXTERNAL_PORT:-7700}:${MEILI_INTERNAL_PORT:-7700}
|
||||
|
||||
meili_secure:
|
||||
image: getmeili/meilisearch:v0.27.0
|
||||
image: getmeili/meilisearch:v0.27.0
|
||||
restart: always
|
||||
ports:
|
||||
- ${MEILI_SECURE_EXTERNAL_PORT:-7700}:${MEILI_SECURE_INTERNAL_PORT:-7700}
|
||||
environment:
|
||||
MEILI_MASTER_KEY: "password"
|
||||
|
||||
|
@ -9,10 +9,10 @@ services:
|
||||
DATADIR: /mysql/
|
||||
expose:
|
||||
- ${MYSQL_PORT:-3306}
|
||||
command: --server_id=100
|
||||
--log-bin='mysql-bin-1.log'
|
||||
--default-time-zone='+3:00'
|
||||
--gtid-mode="ON"
|
||||
command: --server_id=100
|
||||
--log-bin='mysql-bin-1.log'
|
||||
--default-time-zone='+3:00'
|
||||
--gtid-mode="ON"
|
||||
--enforce-gtid-consistency
|
||||
--log-error-verbosity=3
|
||||
--log-error=/mysql/error.log
|
||||
@ -21,4 +21,4 @@ services:
|
||||
volumes:
|
||||
- type: ${MYSQL_LOGS_FS:-tmpfs}
|
||||
source: ${MYSQL_LOGS:-}
|
||||
target: /mysql/
|
||||
target: /mysql/
|
||||
|
@ -9,9 +9,9 @@ services:
|
||||
DATADIR: /mysql/
|
||||
expose:
|
||||
- ${MYSQL8_PORT:-3306}
|
||||
command: --server_id=100 --log-bin='mysql-bin-1.log'
|
||||
--default_authentication_plugin='mysql_native_password'
|
||||
--default-time-zone='+3:00' --gtid-mode="ON"
|
||||
command: --server_id=100 --log-bin='mysql-bin-1.log'
|
||||
--default_authentication_plugin='mysql_native_password'
|
||||
--default-time-zone='+3:00' --gtid-mode="ON"
|
||||
--enforce-gtid-consistency
|
||||
--log-error-verbosity=3
|
||||
--log-error=/mysql/error.log
|
||||
@ -20,4 +20,4 @@ services:
|
||||
volumes:
|
||||
- type: ${MYSQL8_LOGS_FS:-tmpfs}
|
||||
source: ${MYSQL8_LOGS:-}
|
||||
target: /mysql/
|
||||
target: /mysql/
|
||||
|
@ -9,10 +9,10 @@ services:
|
||||
DATADIR: /mysql/
|
||||
expose:
|
||||
- ${MYSQL_CLUSTER_PORT:-3306}
|
||||
command: --server_id=100
|
||||
--log-bin='mysql-bin-2.log'
|
||||
--default-time-zone='+3:00'
|
||||
--gtid-mode="ON"
|
||||
command: --server_id=100
|
||||
--log-bin='mysql-bin-2.log'
|
||||
--default-time-zone='+3:00'
|
||||
--gtid-mode="ON"
|
||||
--enforce-gtid-consistency
|
||||
--log-error-verbosity=3
|
||||
--log-error=/mysql/2_error.log
|
||||
@ -31,10 +31,10 @@ services:
|
||||
DATADIR: /mysql/
|
||||
expose:
|
||||
- ${MYSQL_CLUSTER_PORT:-3306}
|
||||
command: --server_id=100
|
||||
--log-bin='mysql-bin-3.log'
|
||||
--default-time-zone='+3:00'
|
||||
--gtid-mode="ON"
|
||||
command: --server_id=100
|
||||
--log-bin='mysql-bin-3.log'
|
||||
--default-time-zone='+3:00'
|
||||
--gtid-mode="ON"
|
||||
--enforce-gtid-consistency
|
||||
--log-error-verbosity=3
|
||||
--log-error=/mysql/3_error.log
|
||||
@ -53,10 +53,10 @@ services:
|
||||
DATADIR: /mysql/
|
||||
expose:
|
||||
- ${MYSQL_CLUSTER_PORT:-3306}
|
||||
command: --server_id=100
|
||||
--log-bin='mysql-bin-4.log'
|
||||
--default-time-zone='+3:00'
|
||||
--gtid-mode="ON"
|
||||
command: --server_id=100
|
||||
--log-bin='mysql-bin-4.log'
|
||||
--default-time-zone='+3:00'
|
||||
--gtid-mode="ON"
|
||||
--enforce-gtid-consistency
|
||||
--log-error-verbosity=3
|
||||
--log-error=/mysql/4_error.log
|
||||
@ -65,4 +65,4 @@ services:
|
||||
volumes:
|
||||
- type: ${MYSQL_CLUSTER_LOGS_FS:-tmpfs}
|
||||
source: ${MYSQL_CLUSTER_LOGS:-}
|
||||
target: /mysql/
|
||||
target: /mysql/
|
||||
|
@ -5,7 +5,7 @@ services:
|
||||
# Files will be put into /usr/share/nginx/files.
|
||||
|
||||
nginx:
|
||||
image: kssenii/nginx-test:1.1
|
||||
image: clickhouse/nginx-dav:${DOCKER_NGINX_DAV_TAG:-latest}
|
||||
restart: always
|
||||
ports:
|
||||
- 80:80
|
||||
|
@ -12,9 +12,9 @@ services:
|
||||
timeout: 5s
|
||||
retries: 5
|
||||
networks:
|
||||
default:
|
||||
aliases:
|
||||
- postgre-sql.local
|
||||
default:
|
||||
aliases:
|
||||
- postgre-sql.local
|
||||
environment:
|
||||
POSTGRES_HOST_AUTH_METHOD: "trust"
|
||||
POSTGRES_PASSWORD: mysecretpassword
|
||||
|
@ -12,7 +12,7 @@ services:
|
||||
command: ["zkServer.sh", "start-foreground"]
|
||||
entrypoint: /zookeeper-ssl-entrypoint.sh
|
||||
volumes:
|
||||
- type: bind
|
||||
- type: bind
|
||||
source: /misc/zookeeper-ssl-entrypoint.sh
|
||||
target: /zookeeper-ssl-entrypoint.sh
|
||||
- type: bind
|
||||
@ -37,7 +37,7 @@ services:
|
||||
command: ["zkServer.sh", "start-foreground"]
|
||||
entrypoint: /zookeeper-ssl-entrypoint.sh
|
||||
volumes:
|
||||
- type: bind
|
||||
- type: bind
|
||||
source: /misc/zookeeper-ssl-entrypoint.sh
|
||||
target: /zookeeper-ssl-entrypoint.sh
|
||||
- type: bind
|
||||
@ -61,7 +61,7 @@ services:
|
||||
command: ["zkServer.sh", "start-foreground"]
|
||||
entrypoint: /zookeeper-ssl-entrypoint.sh
|
||||
volumes:
|
||||
- type: bind
|
||||
- type: bind
|
||||
source: /misc/zookeeper-ssl-entrypoint.sh
|
||||
target: /zookeeper-ssl-entrypoint.sh
|
||||
- type: bind
|
||||
|
@ -64,15 +64,16 @@ export CLICKHOUSE_ODBC_BRIDGE_BINARY_PATH=/clickhouse-odbc-bridge
|
||||
export CLICKHOUSE_LIBRARY_BRIDGE_BINARY_PATH=/clickhouse-library-bridge
|
||||
|
||||
export DOCKER_BASE_TAG=${DOCKER_BASE_TAG:=latest}
|
||||
export DOCKER_HELPER_TAG=${DOCKER_HELPER_TAG:=latest}
|
||||
export DOCKER_MYSQL_GOLANG_CLIENT_TAG=${DOCKER_MYSQL_GOLANG_CLIENT_TAG:=latest}
|
||||
export DOCKER_DOTNET_CLIENT_TAG=${DOCKER_DOTNET_CLIENT_TAG:=latest}
|
||||
export DOCKER_HELPER_TAG=${DOCKER_HELPER_TAG:=latest}
|
||||
export DOCKER_KERBERIZED_HADOOP_TAG=${DOCKER_KERBERIZED_HADOOP_TAG:=latest}
|
||||
export DOCKER_KERBEROS_KDC_TAG=${DOCKER_KERBEROS_KDC_TAG:=latest}
|
||||
export DOCKER_MYSQL_GOLANG_CLIENT_TAG=${DOCKER_MYSQL_GOLANG_CLIENT_TAG:=latest}
|
||||
export DOCKER_MYSQL_JAVA_CLIENT_TAG=${DOCKER_MYSQL_JAVA_CLIENT_TAG:=latest}
|
||||
export DOCKER_MYSQL_JS_CLIENT_TAG=${DOCKER_MYSQL_JS_CLIENT_TAG:=latest}
|
||||
export DOCKER_MYSQL_PHP_CLIENT_TAG=${DOCKER_MYSQL_PHP_CLIENT_TAG:=latest}
|
||||
export DOCKER_NGINX_DAV_TAG=${DOCKER_NGINX_DAV_TAG:=latest}
|
||||
export DOCKER_POSTGRESQL_JAVA_CLIENT_TAG=${DOCKER_POSTGRESQL_JAVA_CLIENT_TAG:=latest}
|
||||
export DOCKER_KERBEROS_KDC_TAG=${DOCKER_KERBEROS_KDC_TAG:=latest}
|
||||
export DOCKER_KERBERIZED_HADOOP_TAG=${DOCKER_KERBERIZED_HADOOP_TAG:=latest}
|
||||
|
||||
cd /ClickHouse/tests/integration
|
||||
exec "$@"
|
||||
|
@ -11,7 +11,7 @@ ARG apt_archive="http://archive.ubuntu.com"
|
||||
RUN sed -i "s|http://archive.ubuntu.com|$apt_archive|g" /etc/apt/sources.list
|
||||
|
||||
ENV LANG=C.UTF-8
|
||||
ENV TZ=Europe/Moscow
|
||||
ENV TZ=Europe/Amsterdam
|
||||
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
|
||||
|
||||
RUN apt-get update \
|
||||
|
@ -52,7 +52,7 @@ RUN mkdir -p /tmp/clickhouse-odbc-tmp \
|
||||
&& odbcinst -i -s -l -f /tmp/clickhouse-odbc-tmp/share/doc/clickhouse-odbc/config/odbc.ini.sample \
|
||||
&& rm -rf /tmp/clickhouse-odbc-tmp
|
||||
|
||||
ENV TZ=Europe/Moscow
|
||||
ENV TZ=Europe/Amsterdam
|
||||
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
|
||||
|
||||
ENV NUM_TRIES=1
|
||||
|
@ -233,4 +233,10 @@ rowNumberInAllBlocks()
|
||||
LIMIT 1" < /test_output/test_results.tsv > /test_output/check_status.tsv || echo "failure\tCannot parse test_results.tsv" > /test_output/check_status.tsv
|
||||
[ -s /test_output/check_status.tsv ] || echo -e "success\tNo errors found" > /test_output/check_status.tsv
|
||||
|
||||
# But OOMs in stress test are allowed
|
||||
if rg 'OOM in dmesg|Signal 9' /test_output/check_status.tsv
|
||||
then
|
||||
sed -i 's/failure/success/' /test_output/check_status.tsv
|
||||
fi
|
||||
|
||||
collect_core_dumps
|
||||
|
@ -231,4 +231,10 @@ rowNumberInAllBlocks()
|
||||
LIMIT 1" < /test_output/test_results.tsv > /test_output/check_status.tsv || echo "failure\tCannot parse test_results.tsv" > /test_output/check_status.tsv
|
||||
[ -s /test_output/check_status.tsv ] || echo -e "success\tNo errors found" > /test_output/check_status.tsv
|
||||
|
||||
# But OOMs in stress test are allowed
|
||||
if rg 'OOM in dmesg|Signal 9' /test_output/check_status.tsv
|
||||
then
|
||||
sed -i 's/failure/success/' /test_output/check_status.tsv
|
||||
fi
|
||||
|
||||
collect_core_dumps
|
||||
|
@ -84,6 +84,7 @@ The BACKUP and RESTORE statements take a list of DATABASE and TABLE names, a des
|
||||
- `password` for the file on disk
|
||||
- `base_backup`: the destination of the previous backup of this source. For example, `Disk('backups', '1.zip')`
|
||||
- `structure_only`: if enabled, allows to only backup or restore the CREATE statements without the data of tables
|
||||
- `s3_storage_class`: the storage class used for S3 backup. For example, `STANDARD`
|
||||
|
||||
### Usage examples
|
||||
|
||||
|
@ -140,8 +140,8 @@ Time shifts for multiple days. Some pacific islands changed their timezone offse
|
||||
- [Type conversion functions](../../sql-reference/functions/type-conversion-functions.md)
|
||||
- [Functions for working with dates and times](../../sql-reference/functions/date-time-functions.md)
|
||||
- [Functions for working with arrays](../../sql-reference/functions/array-functions.md)
|
||||
- [The `date_time_input_format` setting](../../operations/settings/settings.md#settings-date_time_input_format)
|
||||
- [The `date_time_output_format` setting](../../operations/settings/settings.md#settings-date_time_output_format)
|
||||
- [The `date_time_input_format` setting](../../operations/settings/settings-formats.md#settings-date_time_input_format)
|
||||
- [The `date_time_output_format` setting](../../operations/settings/settings-formats.md#settings-date_time_output_format)
|
||||
- [The `timezone` server configuration parameter](../../operations/server-configuration-parameters/settings.md#server_configuration_parameters-timezone)
|
||||
- [The `session_timezone` setting](../../operations/settings/settings.md#session_timezone)
|
||||
- [Operators for working with dates and times](../../sql-reference/operators/index.md#operators-datetime)
|
||||
|
@ -30,6 +30,7 @@ public:
|
||||
String compression_method;
|
||||
int compression_level = -1;
|
||||
String password;
|
||||
String s3_storage_class;
|
||||
ContextPtr context;
|
||||
bool is_internal_backup = false;
|
||||
std::shared_ptr<IBackupCoordination> backup_coordination;
|
||||
|
@ -88,7 +88,7 @@ namespace
|
||||
request.SetMaxKeys(1);
|
||||
auto outcome = client.ListObjects(request);
|
||||
if (!outcome.IsSuccess())
|
||||
throw Exception::createDeprecated(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR);
|
||||
throw S3Exception(outcome.GetError().GetMessage(), outcome.GetError().GetErrorType());
|
||||
return outcome.GetResult().GetContents();
|
||||
}
|
||||
|
||||
@ -178,7 +178,7 @@ void BackupReaderS3::copyFileToDisk(const String & path_in_backup, size_t file_s
|
||||
|
||||
|
||||
BackupWriterS3::BackupWriterS3(
|
||||
const S3::URI & s3_uri_, const String & access_key_id_, const String & secret_access_key_, bool allow_s3_native_copy, const ContextPtr & context_)
|
||||
const S3::URI & s3_uri_, const String & access_key_id_, const String & secret_access_key_, bool allow_s3_native_copy, const String & storage_class_name, const ContextPtr & context_)
|
||||
: BackupWriterDefault(&Poco::Logger::get("BackupWriterS3"), context_)
|
||||
, s3_uri(s3_uri_)
|
||||
, client(makeS3Client(s3_uri_, access_key_id_, secret_access_key_, context_))
|
||||
@ -188,6 +188,7 @@ BackupWriterS3::BackupWriterS3(
|
||||
request_settings.updateFromSettings(context_->getSettingsRef());
|
||||
request_settings.max_single_read_retries = context_->getSettingsRef().s3_max_single_read_retries; // FIXME: Avoid taking value for endpoint
|
||||
request_settings.allow_native_copy = allow_s3_native_copy;
|
||||
request_settings.setStorageClassName(storage_class_name);
|
||||
}
|
||||
|
||||
void BackupWriterS3::copyFileFromDisk(const String & path_in_backup, DiskPtr src_disk, const String & src_path,
|
||||
@ -271,7 +272,7 @@ void BackupWriterS3::removeFile(const String & file_name)
|
||||
request.SetKey(fs::path(s3_uri.key) / file_name);
|
||||
auto outcome = client->DeleteObject(request);
|
||||
if (!outcome.IsSuccess() && !isNotFoundError(outcome.GetError().GetErrorType()))
|
||||
throw Exception::createDeprecated(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR);
|
||||
throw S3Exception(outcome.GetError().GetMessage(), outcome.GetError().GetErrorType());
|
||||
}
|
||||
|
||||
void BackupWriterS3::removeFiles(const Strings & file_names)
|
||||
@ -329,7 +330,7 @@ void BackupWriterS3::removeFilesBatch(const Strings & file_names)
|
||||
|
||||
auto outcome = client->DeleteObjects(request);
|
||||
if (!outcome.IsSuccess() && !isNotFoundError(outcome.GetError().GetErrorType()))
|
||||
throw Exception::createDeprecated(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR);
|
||||
throw S3Exception(outcome.GetError().GetMessage(), outcome.GetError().GetErrorType());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -38,7 +38,7 @@ private:
|
||||
class BackupWriterS3 : public BackupWriterDefault
|
||||
{
|
||||
public:
|
||||
BackupWriterS3(const S3::URI & s3_uri_, const String & access_key_id_, const String & secret_access_key_, bool allow_s3_native_copy, const ContextPtr & context_);
|
||||
BackupWriterS3(const S3::URI & s3_uri_, const String & access_key_id_, const String & secret_access_key_, bool allow_s3_native_copy, const String & storage_class_name, const ContextPtr & context_);
|
||||
~BackupWriterS3() override;
|
||||
|
||||
bool fileExists(const String & file_name) override;
|
||||
|
@ -21,6 +21,7 @@ namespace ErrorCodes
|
||||
M(String, id) \
|
||||
M(String, compression_method) \
|
||||
M(String, password) \
|
||||
M(String, s3_storage_class) \
|
||||
M(Bool, structure_only) \
|
||||
M(Bool, async) \
|
||||
M(Bool, decrypt_files_from_encrypted_disks) \
|
||||
|
@ -25,6 +25,9 @@ struct BackupSettings
|
||||
/// Password used to encrypt the backup.
|
||||
String password;
|
||||
|
||||
/// S3 storage class.
|
||||
String s3_storage_class = "";
|
||||
|
||||
/// If this is set to true then only create queries will be written to backup,
|
||||
/// without the data of tables.
|
||||
bool structure_only = false;
|
||||
|
@ -344,6 +344,7 @@ void BackupsWorker::doBackup(
|
||||
backup_create_params.compression_method = backup_settings.compression_method;
|
||||
backup_create_params.compression_level = backup_settings.compression_level;
|
||||
backup_create_params.password = backup_settings.password;
|
||||
backup_create_params.s3_storage_class = backup_settings.s3_storage_class;
|
||||
backup_create_params.is_internal_backup = backup_settings.internal;
|
||||
backup_create_params.backup_coordination = backup_coordination;
|
||||
backup_create_params.backup_uuid = backup_settings.backup_uuid;
|
||||
|
@ -112,7 +112,7 @@ void registerBackupEngineS3(BackupFactory & factory)
|
||||
}
|
||||
else
|
||||
{
|
||||
auto writer = std::make_shared<BackupWriterS3>(S3::URI{s3_uri}, access_key_id, secret_access_key, params.allow_s3_native_copy, params.context);
|
||||
auto writer = std::make_shared<BackupWriterS3>(S3::URI{s3_uri}, access_key_id, secret_access_key, params.allow_s3_native_copy, params.s3_storage_class, params.context);
|
||||
return std::make_unique<BackupImpl>(
|
||||
backup_name_for_logging,
|
||||
archive_params,
|
||||
|
@ -153,7 +153,10 @@ Pool::Entry Pool::get(uint64_t wait_timeout)
|
||||
for (auto & connection : connections)
|
||||
{
|
||||
if (connection->ref_count == 0)
|
||||
{
|
||||
logger.test("Found free connection in pool, returning it to the caller");
|
||||
return Entry(connection, this);
|
||||
}
|
||||
}
|
||||
|
||||
logger.trace("(%s): Trying to allocate a new connection.", getDescription());
|
||||
|
@ -26,7 +26,7 @@ namespace mysqlxx
|
||||
*
|
||||
* void thread()
|
||||
* {
|
||||
* mysqlxx::Pool::Entry connection = pool.Get();
|
||||
* mysqlxx::Pool::Entry connection = pool.Get();
|
||||
* std::string s = connection->query("SELECT 'Hello, world!' AS world").use().fetch()["world"].getString();
|
||||
* }
|
||||
* TODO: simplify with PoolBase.
|
||||
|
@ -320,8 +320,6 @@ bool KeeperDispatcher::putRequest(const Coordination::ZooKeeperRequestPtr & requ
|
||||
request_info.time = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
|
||||
request_info.session_id = session_id;
|
||||
|
||||
std::lock_guard lock(push_request_mutex);
|
||||
|
||||
if (shutdown_called)
|
||||
return false;
|
||||
|
||||
@ -423,13 +421,10 @@ void KeeperDispatcher::shutdown()
|
||||
try
|
||||
{
|
||||
{
|
||||
std::lock_guard lock(push_request_mutex);
|
||||
|
||||
if (shutdown_called)
|
||||
if (shutdown_called.exchange(true))
|
||||
return;
|
||||
|
||||
LOG_DEBUG(log, "Shutting down storage dispatcher");
|
||||
shutdown_called = true;
|
||||
|
||||
if (session_cleaner_thread.joinable())
|
||||
session_cleaner_thread.join();
|
||||
@ -582,12 +577,9 @@ void KeeperDispatcher::sessionCleanerTask()
|
||||
.time = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count(),
|
||||
.request = std::move(request),
|
||||
};
|
||||
{
|
||||
std::lock_guard lock(push_request_mutex);
|
||||
if (!requests_queue->push(std::move(request_info)))
|
||||
LOG_INFO(log, "Cannot push close request to queue while cleaning outdated sessions");
|
||||
CurrentMetrics::add(CurrentMetrics::KeeperOutstandingRequets);
|
||||
}
|
||||
if (!requests_queue->push(std::move(request_info)))
|
||||
LOG_INFO(log, "Cannot push close request to queue while cleaning outdated sessions");
|
||||
CurrentMetrics::add(CurrentMetrics::KeeperOutstandingRequets);
|
||||
|
||||
/// Remove session from registered sessions
|
||||
finishSession(dead_session);
|
||||
@ -607,6 +599,10 @@ void KeeperDispatcher::sessionCleanerTask()
|
||||
|
||||
void KeeperDispatcher::finishSession(int64_t session_id)
|
||||
{
|
||||
/// shutdown() method will cleanup sessions if needed
|
||||
if (shutdown_called)
|
||||
return;
|
||||
|
||||
{
|
||||
std::lock_guard lock(session_to_response_callback_mutex);
|
||||
auto session_it = session_to_response_callback.find(session_id);
|
||||
@ -698,12 +694,9 @@ int64_t KeeperDispatcher::getSessionID(int64_t session_timeout_ms)
|
||||
}
|
||||
|
||||
/// Push new session request to queue
|
||||
{
|
||||
std::lock_guard lock(push_request_mutex);
|
||||
if (!requests_queue->tryPush(std::move(request_info), session_timeout_ms))
|
||||
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Cannot push session id request to queue within session timeout");
|
||||
CurrentMetrics::add(CurrentMetrics::KeeperOutstandingRequets);
|
||||
}
|
||||
if (!requests_queue->tryPush(std::move(request_info), session_timeout_ms))
|
||||
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Cannot push session id request to queue within session timeout");
|
||||
CurrentMetrics::add(CurrentMetrics::KeeperOutstandingRequets);
|
||||
|
||||
if (future.wait_for(std::chrono::milliseconds(session_timeout_ms)) != std::future_status::ready)
|
||||
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Cannot receive session id within session timeout");
|
||||
@ -871,10 +864,7 @@ uint64_t KeeperDispatcher::getSnapDirSize() const
|
||||
Keeper4LWInfo KeeperDispatcher::getKeeper4LWInfo() const
|
||||
{
|
||||
Keeper4LWInfo result = server->getPartiallyFilled4LWInfo();
|
||||
{
|
||||
std::lock_guard lock(push_request_mutex);
|
||||
result.outstanding_requests_count = requests_queue->size();
|
||||
}
|
||||
result.outstanding_requests_count = requests_queue->size();
|
||||
{
|
||||
std::lock_guard lock(session_to_response_callback_mutex);
|
||||
result.alive_connections_count = session_to_response_callback.size();
|
||||
|
@ -27,8 +27,6 @@ using ZooKeeperResponseCallback = std::function<void(const Coordination::ZooKeep
|
||||
class KeeperDispatcher
|
||||
{
|
||||
private:
|
||||
mutable std::mutex push_request_mutex;
|
||||
|
||||
using RequestsQueue = ConcurrentBoundedQueue<KeeperStorage::RequestForSession>;
|
||||
using SessionToResponseCallback = std::unordered_map<int64_t, ZooKeeperResponseCallback>;
|
||||
using ClusterUpdateQueue = ConcurrentBoundedQueue<ClusterUpdateAction>;
|
||||
|
@ -794,8 +794,14 @@ bool KeeperServer::applyConfigUpdate(const ClusterUpdateAction & action)
|
||||
std::lock_guard _{server_write_mutex};
|
||||
|
||||
if (const auto * add = std::get_if<AddRaftServer>(&action))
|
||||
return raft_instance->get_srv_config(add->id) != nullptr
|
||||
|| raft_instance->add_srv(static_cast<nuraft::srv_config>(*add))->get_accepted();
|
||||
{
|
||||
if (raft_instance->get_srv_config(add->id) != nullptr)
|
||||
return true;
|
||||
|
||||
auto resp = raft_instance->add_srv(static_cast<nuraft::srv_config>(*add));
|
||||
resp->get();
|
||||
return resp->get_accepted();
|
||||
}
|
||||
else if (const auto * remove = std::get_if<RemoveRaftServer>(&action))
|
||||
{
|
||||
if (remove->id == raft_instance->get_leader())
|
||||
@ -807,8 +813,12 @@ bool KeeperServer::applyConfigUpdate(const ClusterUpdateAction & action)
|
||||
return false;
|
||||
}
|
||||
|
||||
return raft_instance->get_srv_config(remove->id) == nullptr
|
||||
|| raft_instance->remove_srv(remove->id)->get_accepted();
|
||||
if (raft_instance->get_srv_config(remove->id) == nullptr)
|
||||
return true;
|
||||
|
||||
auto resp = raft_instance->remove_srv(remove->id);
|
||||
resp->get();
|
||||
return resp->get_accepted();
|
||||
}
|
||||
else if (const auto * update = std::get_if<UpdateRaftServerPriority>(&action))
|
||||
{
|
||||
|
@ -65,6 +65,7 @@ void DatabaseMaterializedMySQL::setException(const std::exception_ptr & exceptio
|
||||
|
||||
void DatabaseMaterializedMySQL::startupTables(ThreadPool & thread_pool, LoadingStrictnessLevel mode)
|
||||
{
|
||||
LOG_TRACE(log, "Starting MaterializeMySQL tables");
|
||||
DatabaseAtomic::startupTables(thread_pool, mode);
|
||||
|
||||
if (mode < LoadingStrictnessLevel::FORCE_ATTACH)
|
||||
@ -122,6 +123,7 @@ void DatabaseMaterializedMySQL::alterTable(ContextPtr context_, const StorageID
|
||||
|
||||
void DatabaseMaterializedMySQL::drop(ContextPtr context_)
|
||||
{
|
||||
LOG_TRACE(log, "Dropping MaterializeMySQL database");
|
||||
/// Remove metadata info
|
||||
fs::path metadata(getMetadataPath() + "/.metadata");
|
||||
|
||||
|
@ -11,6 +11,7 @@
|
||||
#include <Databases/DatabaseAtomic.h>
|
||||
#include <Databases/MySQL/MaterializedMySQLSettings.h>
|
||||
#include <Databases/MySQL/MaterializedMySQLSyncThread.h>
|
||||
#include <Common/logger_useful.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
@ -1,3 +1,4 @@
|
||||
#include "Common/logger_useful.h"
|
||||
#include "config.h"
|
||||
|
||||
#if USE_MYSQL
|
||||
@ -499,7 +500,10 @@ bool MaterializedMySQLSyncThread::prepareSynchronized(MaterializeMetadata & meta
|
||||
{
|
||||
throw;
|
||||
}
|
||||
catch (const mysqlxx::ConnectionFailed &) {}
|
||||
catch (const mysqlxx::ConnectionFailed & ex)
|
||||
{
|
||||
LOG_TRACE(log, "Connection to MySQL failed {}", ex.displayText());
|
||||
}
|
||||
catch (const mysqlxx::BadQuery & e)
|
||||
{
|
||||
// Lost connection to MySQL server during query
|
||||
|
@ -135,7 +135,7 @@ private:
|
||||
return result;
|
||||
}
|
||||
|
||||
throw Exception(ErrorCodes::S3_ERROR, "Could not list objects in bucket {} with prefix {}, S3 exception: {}, message: {}",
|
||||
throw S3Exception(outcome.GetError().GetErrorType(), "Could not list objects in bucket {} with prefix {}, S3 exception: {}, message: {}",
|
||||
quoteString(request.GetBucket()), quoteString(request.GetPrefix()),
|
||||
backQuote(outcome.GetError().GetExceptionName()), quoteString(outcome.GetError().GetMessage()));
|
||||
}
|
||||
|
@ -31,7 +31,7 @@ namespace ErrorCodes
|
||||
message.empty() ? "" : ": " + message);
|
||||
}
|
||||
|
||||
Poco::AutoPtr<Poco::XML::Document> getDiskConfigurationFromASTImpl(const std::string & root_name, const ASTs & disk_args, ContextPtr context)
|
||||
Poco::AutoPtr<Poco::XML::Document> getDiskConfigurationFromASTImpl(const ASTs & disk_args, ContextPtr context)
|
||||
{
|
||||
if (disk_args.empty())
|
||||
throwBadConfiguration("expected non-empty list of arguments");
|
||||
@ -39,8 +39,6 @@ Poco::AutoPtr<Poco::XML::Document> getDiskConfigurationFromASTImpl(const std::st
|
||||
Poco::AutoPtr<Poco::XML::Document> xml_document(new Poco::XML::Document());
|
||||
Poco::AutoPtr<Poco::XML::Element> root(xml_document->createElement("disk"));
|
||||
xml_document->appendChild(root);
|
||||
Poco::AutoPtr<Poco::XML::Element> disk_configuration(xml_document->createElement(root_name));
|
||||
root->appendChild(disk_configuration);
|
||||
|
||||
for (const auto & arg : disk_args)
|
||||
{
|
||||
@ -62,7 +60,7 @@ Poco::AutoPtr<Poco::XML::Document> getDiskConfigurationFromASTImpl(const std::st
|
||||
|
||||
const std::string & key = key_identifier->name();
|
||||
Poco::AutoPtr<Poco::XML::Element> key_element(xml_document->createElement(key));
|
||||
disk_configuration->appendChild(key_element);
|
||||
root->appendChild(key_element);
|
||||
|
||||
if (!function_args[1]->as<ASTLiteral>() && !function_args[1]->as<ASTIdentifier>())
|
||||
throwBadConfiguration("expected values to be literals or identifiers");
|
||||
@ -75,9 +73,9 @@ Poco::AutoPtr<Poco::XML::Document> getDiskConfigurationFromASTImpl(const std::st
|
||||
return xml_document;
|
||||
}
|
||||
|
||||
DiskConfigurationPtr getDiskConfigurationFromAST(const std::string & root_name, const ASTs & disk_args, ContextPtr context)
|
||||
DiskConfigurationPtr getDiskConfigurationFromAST(const ASTs & disk_args, ContextPtr context)
|
||||
{
|
||||
auto xml_document = getDiskConfigurationFromASTImpl(root_name, disk_args, context);
|
||||
auto xml_document = getDiskConfigurationFromASTImpl(disk_args, context);
|
||||
Poco::AutoPtr<Poco::Util::XMLConfiguration> conf(new Poco::Util::XMLConfiguration());
|
||||
conf->load(xml_document);
|
||||
return conf;
|
||||
|
@ -14,19 +14,19 @@ using DiskConfigurationPtr = Poco::AutoPtr<Poco::Util::AbstractConfiguration>;
|
||||
/**
|
||||
* Transform a list of pairs ( key1=value1, key2=value2, ... ), where keys and values are ASTLiteral or ASTIdentifier
|
||||
* into
|
||||
* <root_name>
|
||||
* <disk>
|
||||
* <key1>value1</key1>
|
||||
* <key2>value2</key2>
|
||||
* ...
|
||||
* </root_name>
|
||||
* </disk>
|
||||
*
|
||||
* Used in case disk configuration is passed via AST when creating
|
||||
* a disk object on-the-fly without any configuration file.
|
||||
*/
|
||||
DiskConfigurationPtr getDiskConfigurationFromAST(const std::string & root_name, const ASTs & disk_args, ContextPtr context);
|
||||
DiskConfigurationPtr getDiskConfigurationFromAST(const ASTs & disk_args, ContextPtr context);
|
||||
|
||||
/// The same as above function, but return XML::Document for easier modification of result configuration.
|
||||
[[ maybe_unused ]] Poco::AutoPtr<Poco::XML::Document> getDiskConfigurationFromASTImpl(const std::string & root_name, const ASTs & disk_args, ContextPtr context);
|
||||
[[ maybe_unused ]] Poco::AutoPtr<Poco::XML::Document> getDiskConfigurationFromASTImpl(const ASTs & disk_args, ContextPtr context);
|
||||
|
||||
/*
|
||||
* A reverse function.
|
||||
|
@ -26,8 +26,16 @@ namespace
|
||||
{
|
||||
std::string getOrCreateDiskFromDiskAST(const ASTFunction & function, ContextPtr context)
|
||||
{
|
||||
const auto * function_args_expr = assert_cast<const ASTExpressionList *>(function.arguments.get());
|
||||
const auto & function_args = function_args_expr->children;
|
||||
auto config = getDiskConfigurationFromAST(function_args, context);
|
||||
|
||||
std::string disk_name;
|
||||
if (function.name == "disk")
|
||||
if (config->has("name"))
|
||||
{
|
||||
disk_name = config->getString("name");
|
||||
}
|
||||
else
|
||||
{
|
||||
/// We need a unique name for a created custom disk, but it needs to be the same
|
||||
/// after table is reattached or server is restarted, so take a hash of the disk
|
||||
@ -36,21 +44,9 @@ namespace
|
||||
disk_name = DiskSelector::TMP_INTERNAL_DISK_PREFIX
|
||||
+ toString(sipHash128(disk_setting_string.data(), disk_setting_string.size()));
|
||||
}
|
||||
else
|
||||
{
|
||||
static constexpr std::string_view custom_disk_prefix = "disk_";
|
||||
|
||||
if (function.name.size() <= custom_disk_prefix.size() || !function.name.starts_with(custom_disk_prefix))
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Invalid disk name: {}", function.name);
|
||||
|
||||
disk_name = function.name.substr(custom_disk_prefix.size());
|
||||
}
|
||||
|
||||
auto result_disk = context->getOrCreateDisk(disk_name, [&](const DisksMap & disks_map) -> DiskPtr {
|
||||
const auto * function_args_expr = assert_cast<const ASTExpressionList *>(function.arguments.get());
|
||||
const auto & function_args = function_args_expr->children;
|
||||
auto config = getDiskConfigurationFromAST(disk_name, function_args, context);
|
||||
auto disk = DiskFactory::instance().create(disk_name, *config, disk_name, context, disks_map);
|
||||
auto disk = DiskFactory::instance().create(disk_name, *config, "", context, disks_map);
|
||||
/// Mark that disk can be used without storage policy.
|
||||
disk->markDiskAsCustom();
|
||||
return disk;
|
||||
|
@ -783,7 +783,7 @@ namespace
|
||||
if (!outcome.IsSuccess())
|
||||
{
|
||||
abortMultipartUpload();
|
||||
throw Exception::createDeprecated(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR);
|
||||
throw S3Exception(outcome.GetError().GetMessage(), outcome.GetError().GetErrorType());
|
||||
}
|
||||
|
||||
return outcome.GetResult().GetCopyPartResult().GetETag();
|
||||
|
@ -85,7 +85,7 @@ ObjectInfo getObjectInfo(
|
||||
}
|
||||
else if (throw_on_error)
|
||||
{
|
||||
throw DB::Exception(ErrorCodes::S3_ERROR,
|
||||
throw S3Exception(error.GetErrorType(),
|
||||
"Failed to get object info: {}. HTTP response code: {}",
|
||||
error.GetMessage(), static_cast<size_t>(error.GetResponseCode()));
|
||||
}
|
||||
|
@ -764,7 +764,7 @@ InterpreterCreateQuery::TableProperties InterpreterCreateQuery::getTableProperti
|
||||
/// Table function without columns list.
|
||||
auto table_function_ast = create.as_table_function->ptr();
|
||||
auto table_function = TableFunctionFactory::instance().get(table_function_ast, getContext());
|
||||
properties.columns = table_function->getActualTableStructure(getContext());
|
||||
properties.columns = table_function->getActualTableStructure(getContext(), /*is_insert_query*/ true);
|
||||
}
|
||||
else if (create.is_dictionary)
|
||||
{
|
||||
|
@ -96,7 +96,7 @@ BlockIO InterpreterDescribeQuery::execute()
|
||||
else if (table_expression.table_function)
|
||||
{
|
||||
TableFunctionPtr table_function_ptr = TableFunctionFactory::instance().get(table_expression.table_function, getContext());
|
||||
auto table_function_column_descriptions = table_function_ptr->getActualTableStructure(getContext());
|
||||
auto table_function_column_descriptions = table_function_ptr->getActualTableStructure(getContext(), /*is_insert_query*/ true);
|
||||
for (const auto & table_function_column_description : table_function_column_descriptions)
|
||||
columns.emplace_back(table_function_column_description);
|
||||
}
|
||||
|
@ -215,7 +215,7 @@ bool ParserSetQuery::parseNameValuePair(SettingChange & change, IParser::Pos & p
|
||||
else if (ParserKeyword("FALSE").ignore(pos, expected))
|
||||
value = std::make_shared<ASTLiteral>(Field(static_cast<UInt64>(0)));
|
||||
/// for SETTINGS disk=disk(type='s3', path='', ...)
|
||||
else if (function_p.parse(pos, function_ast, expected) && function_ast->as<ASTFunction>()->name.starts_with("disk"))
|
||||
else if (function_p.parse(pos, function_ast, expected) && function_ast->as<ASTFunction>()->name == "disk")
|
||||
{
|
||||
tryGetIdentifierNameInto(name, change.name);
|
||||
change.value = createFieldFromAST(function_ast);
|
||||
@ -280,7 +280,7 @@ bool ParserSetQuery::parseNameValuePairWithParameterOrDefault(
|
||||
node = std::make_shared<ASTLiteral>(Field(static_cast<UInt64>(1)));
|
||||
else if (ParserKeyword("FALSE").ignore(pos, expected))
|
||||
node = std::make_shared<ASTLiteral>(Field(static_cast<UInt64>(0)));
|
||||
else if (function_p.parse(pos, function_ast, expected) && function_ast->as<ASTFunction>()->name.starts_with("disk"))
|
||||
else if (function_p.parse(pos, function_ast, expected) && function_ast->as<ASTFunction>()->name == "disk")
|
||||
{
|
||||
change.name = name;
|
||||
change.value = createFieldFromAST(function_ast);
|
||||
|
@ -57,8 +57,8 @@ std::vector<String> S3DataLakeMetadataReadHelper::listFiles(
|
||||
{
|
||||
outcome = client->ListObjectsV2(request);
|
||||
if (!outcome.IsSuccess())
|
||||
throw Exception(
|
||||
ErrorCodes::S3_ERROR,
|
||||
throw S3Exception(
|
||||
outcome.GetError().GetErrorType(),
|
||||
"Could not list objects in bucket {} with key {}, S3 exception: {}, message: {}",
|
||||
quoteString(bucket),
|
||||
quoteString(base_configuration.url.key),
|
||||
|
@ -86,7 +86,7 @@ const std::unordered_set<std::string_view> optional_configuration_keys = {
|
||||
|
||||
bool isConnectionString(const std::string & candidate)
|
||||
{
|
||||
return candidate.starts_with("DefaultEndpointsProtocol");
|
||||
return !candidate.starts_with("http");
|
||||
}
|
||||
|
||||
}
|
||||
@ -257,7 +257,7 @@ void registerStorageAzureBlob(StorageFactory & factory)
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "External data source must have arguments");
|
||||
|
||||
auto configuration = StorageAzureBlob::getConfiguration(engine_args, args.getLocalContext());
|
||||
auto client = StorageAzureBlob::createClient(configuration);
|
||||
auto client = StorageAzureBlob::createClient(configuration, /* is_read_only */ false);
|
||||
// Use format settings from global server context + settings from
|
||||
// the SETTINGS clause of the create query. Settings from current
|
||||
// session and user are ignored.
|
||||
@ -309,58 +309,113 @@ void registerStorageAzureBlob(StorageFactory & factory)
|
||||
});
|
||||
}
|
||||
|
||||
AzureClientPtr StorageAzureBlob::createClient(StorageAzureBlob::Configuration configuration)
|
||||
static bool containerExists(std::unique_ptr<BlobServiceClient> &blob_service_client, std::string container_name)
|
||||
{
|
||||
Azure::Storage::Blobs::ListBlobContainersOptions options;
|
||||
options.Prefix = container_name;
|
||||
options.PageSizeHint = 1;
|
||||
|
||||
auto containers_list_response = blob_service_client->ListBlobContainers(options);
|
||||
auto containers_list = containers_list_response.BlobContainers;
|
||||
|
||||
for (const auto & container : containers_list)
|
||||
{
|
||||
if (container_name == container.Name)
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
AzureClientPtr StorageAzureBlob::createClient(StorageAzureBlob::Configuration configuration, bool is_read_only)
|
||||
{
|
||||
AzureClientPtr result;
|
||||
|
||||
if (configuration.is_connection_string)
|
||||
{
|
||||
std::unique_ptr<BlobServiceClient> blob_service_client = std::make_unique<BlobServiceClient>(BlobServiceClient::CreateFromConnectionString(configuration.connection_url));
|
||||
result = std::make_unique<BlobContainerClient>(BlobContainerClient::CreateFromConnectionString(configuration.connection_url, configuration.container));
|
||||
result->CreateIfNotExists();
|
||||
}
|
||||
else
|
||||
{
|
||||
if (configuration.account_name.has_value() && configuration.account_key.has_value())
|
||||
bool container_exists = containerExists(blob_service_client,configuration.container);
|
||||
|
||||
if (!container_exists)
|
||||
{
|
||||
auto storage_shared_key_credential = std::make_shared<Azure::Storage::StorageSharedKeyCredential>(*configuration.account_name, *configuration.account_key);
|
||||
auto blob_service_client = std::make_unique<BlobServiceClient>(configuration.connection_url, storage_shared_key_credential);
|
||||
if (is_read_only)
|
||||
throw Exception(
|
||||
ErrorCodes::DATABASE_ACCESS_DENIED,
|
||||
"AzureBlobStorage container does not exist '{}'",
|
||||
configuration.container);
|
||||
|
||||
try
|
||||
{
|
||||
result = std::make_unique<BlobContainerClient>(blob_service_client->CreateBlobContainer(configuration.container).Value);
|
||||
}
|
||||
catch (const Azure::Storage::StorageException & e)
|
||||
result->CreateIfNotExists();
|
||||
} catch (const Azure::Storage::StorageException & e)
|
||||
{
|
||||
if (e.StatusCode == Azure::Core::Http::HttpStatusCode::Conflict)
|
||||
{
|
||||
auto final_url = configuration.connection_url
|
||||
+ (configuration.connection_url.back() == '/' ? "" : "/")
|
||||
+ configuration.container;
|
||||
|
||||
result = std::make_unique<BlobContainerClient>(final_url, storage_shared_key_credential);
|
||||
}
|
||||
else
|
||||
if (!(e.StatusCode == Azure::Core::Http::HttpStatusCode::Conflict
|
||||
&& e.ReasonPhrase == "The specified container already exists."))
|
||||
{
|
||||
throw;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
std::shared_ptr<Azure::Storage::StorageSharedKeyCredential> storage_shared_key_credential;
|
||||
if (configuration.account_name.has_value() && configuration.account_key.has_value())
|
||||
{
|
||||
storage_shared_key_credential
|
||||
= std::make_shared<Azure::Storage::StorageSharedKeyCredential>(*configuration.account_name, *configuration.account_key);
|
||||
}
|
||||
|
||||
std::unique_ptr<BlobServiceClient> blob_service_client;
|
||||
if (storage_shared_key_credential)
|
||||
{
|
||||
blob_service_client = std::make_unique<BlobServiceClient>(configuration.connection_url, storage_shared_key_credential);
|
||||
}
|
||||
else
|
||||
{
|
||||
auto managed_identity_credential = std::make_shared<Azure::Identity::ManagedIdentityCredential>();
|
||||
auto blob_service_client = std::make_unique<BlobServiceClient>(configuration.connection_url, managed_identity_credential);
|
||||
blob_service_client = std::make_unique<BlobServiceClient>(configuration.connection_url);
|
||||
}
|
||||
|
||||
bool container_exists = containerExists(blob_service_client,configuration.container);
|
||||
|
||||
std::string final_url;
|
||||
size_t pos = configuration.connection_url.find('?');
|
||||
if (pos != std::string::npos)
|
||||
{
|
||||
auto url_without_sas = configuration.connection_url.substr(0, pos);
|
||||
final_url = url_without_sas + (url_without_sas.back() == '/' ? "" : "/") + configuration.container
|
||||
+ configuration.connection_url.substr(pos);
|
||||
}
|
||||
else
|
||||
final_url
|
||||
= configuration.connection_url + (configuration.connection_url.back() == '/' ? "" : "/") + configuration.container;
|
||||
|
||||
if (container_exists)
|
||||
{
|
||||
if (storage_shared_key_credential)
|
||||
result = std::make_unique<BlobContainerClient>(final_url, storage_shared_key_credential);
|
||||
else
|
||||
result = std::make_unique<BlobContainerClient>(final_url);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (is_read_only)
|
||||
throw Exception(
|
||||
ErrorCodes::DATABASE_ACCESS_DENIED,
|
||||
"AzureBlobStorage container does not exist '{}'",
|
||||
configuration.container);
|
||||
try
|
||||
{
|
||||
result = std::make_unique<BlobContainerClient>(blob_service_client->CreateBlobContainer(configuration.container).Value);
|
||||
}
|
||||
catch (const Azure::Storage::StorageException & e)
|
||||
} catch (const Azure::Storage::StorageException & e)
|
||||
{
|
||||
if (e.StatusCode == Azure::Core::Http::HttpStatusCode::Conflict)
|
||||
if (e.StatusCode == Azure::Core::Http::HttpStatusCode::Conflict
|
||||
&& e.ReasonPhrase == "The specified container already exists.")
|
||||
{
|
||||
auto final_url = configuration.connection_url
|
||||
+ (configuration.connection_url.back() == '/' ? "" : "/")
|
||||
+ configuration.container;
|
||||
|
||||
result = std::make_unique<BlobContainerClient>(final_url, managed_identity_credential);
|
||||
if (storage_shared_key_credential)
|
||||
result = std::make_unique<BlobContainerClient>(final_url, storage_shared_key_credential);
|
||||
else
|
||||
result = std::make_unique<BlobContainerClient>(final_url);
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -438,7 +493,7 @@ void StorageAzureBlob::truncate(const ASTPtr &, const StorageMetadataPtr &, Cont
|
||||
{
|
||||
throw Exception(
|
||||
ErrorCodes::DATABASE_ACCESS_DENIED,
|
||||
"S3 key '{}' contains globs, so the table is in readonly mode",
|
||||
"AzureBlobStorage key '{}' contains globs, so the table is in readonly mode",
|
||||
configuration.blob_path);
|
||||
}
|
||||
|
||||
@ -1203,7 +1258,7 @@ ColumnsDescription StorageAzureBlob::getTableStructureFromData(
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
/// S3 file iterator could get new keys after new iteration, check them in schema cache.
|
||||
///AzureBlobStorage file iterator could get new keys after new iteration, check them in schema cache.
|
||||
if (ctx->getSettingsRef().schema_inference_use_cache_for_azure && read_keys.size() > prev_read_keys_size)
|
||||
{
|
||||
columns_from_cache = tryGetColumnsFromCache(read_keys.begin() + prev_read_keys_size, read_keys.end(), configuration, format_settings, ctx);
|
||||
|
@ -65,7 +65,7 @@ public:
|
||||
ASTPtr partition_by_);
|
||||
|
||||
static StorageAzureBlob::Configuration getConfiguration(ASTs & engine_args, ContextPtr local_context);
|
||||
static AzureClientPtr createClient(StorageAzureBlob::Configuration configuration);
|
||||
static AzureClientPtr createClient(StorageAzureBlob::Configuration configuration, bool is_read_only);
|
||||
|
||||
static AzureObjectStorage::SettingsPtr createSettings(ContextPtr local_context);
|
||||
|
||||
|
@ -245,7 +245,7 @@ private:
|
||||
|
||||
if (!outcome.IsSuccess())
|
||||
{
|
||||
throw Exception(ErrorCodes::S3_ERROR, "Could not list objects in bucket {} with prefix {}, S3 exception: {}, message: {}",
|
||||
throw S3Exception(outcome.GetError().GetErrorType(), "Could not list objects in bucket {} with prefix {}, S3 exception: {}, message: {}",
|
||||
quoteString(request.GetBucket()), quoteString(request.GetPrefix()),
|
||||
backQuote(outcome.GetError().GetExceptionName()), quoteString(outcome.GetError().GetMessage()));
|
||||
}
|
||||
@ -1195,7 +1195,7 @@ void StorageS3::truncate(const ASTPtr & /* query */, const StorageMetadataPtr &,
|
||||
if (!response.IsSuccess())
|
||||
{
|
||||
const auto & err = response.GetError();
|
||||
throw Exception(ErrorCodes::S3_ERROR, "{}: {}", std::to_string(static_cast<int>(err.GetErrorType())), err.GetMessage());
|
||||
throw S3Exception(err.GetMessage(), err.GetErrorType());
|
||||
}
|
||||
|
||||
for (const auto & error : response.GetResult().GetErrors())
|
||||
|
@ -77,6 +77,8 @@ struct S3Settings
|
||||
|
||||
const PartUploadSettings & getUploadSettings() const { return upload_settings; }
|
||||
|
||||
void setStorageClassName(const String & storage_class_name) { upload_settings.storage_class_name = storage_class_name; }
|
||||
|
||||
RequestSettings() = default;
|
||||
explicit RequestSettings(const Settings & settings);
|
||||
explicit RequestSettings(const NamedCollection & collection);
|
||||
|
@ -38,7 +38,7 @@ ColumnsDescription getStructureOfRemoteTableInShard(
|
||||
if (shard_info.isLocal())
|
||||
{
|
||||
TableFunctionPtr table_function_ptr = TableFunctionFactory::instance().get(table_func_ptr, context);
|
||||
return table_function_ptr->getActualTableStructure(context);
|
||||
return table_function_ptr->getActualTableStructure(context, /*is_insert_query*/ true);
|
||||
}
|
||||
|
||||
auto table_func_name = queryToString(table_func_ptr);
|
||||
|
@ -49,13 +49,14 @@ namespace DB
|
||||
actual_columns = parseColumnsListFromString(table_structure, context_);
|
||||
}
|
||||
|
||||
ColumnsDescription TableFunctionHive::getActualTableStructure(ContextPtr /*context_*/) const { return actual_columns; }
|
||||
ColumnsDescription TableFunctionHive::getActualTableStructure(ContextPtr /*context_*/, bool /*is_insert_query*/) const { return actual_columns; }
|
||||
|
||||
StoragePtr TableFunctionHive::executeImpl(
|
||||
const ASTPtr & /*ast_function_*/,
|
||||
ContextPtr context_,
|
||||
const std::string & table_name_,
|
||||
ColumnsDescription /*cached_columns_*/) const
|
||||
ColumnsDescription /*cached_columns_*/,
|
||||
bool /*is_insert_query*/) const
|
||||
{
|
||||
const Settings & settings = context_->getSettings();
|
||||
ParserExpression partition_by_parser;
|
||||
|
@ -17,10 +17,10 @@ public:
|
||||
bool hasStaticStructure() const override { return true; }
|
||||
|
||||
StoragePtr executeImpl(
|
||||
const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns) const override;
|
||||
const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns, bool is_insert_query) const override;
|
||||
|
||||
const char * getStorageTypeName() const override { return storage_type_name; }
|
||||
ColumnsDescription getActualTableStructure(ContextPtr) const override;
|
||||
ColumnsDescription getActualTableStructure(ContextPtr, bool is_insert_query) const override;
|
||||
void parseArguments(const ASTPtr & ast_function_, ContextPtr context_) override;
|
||||
|
||||
private:
|
||||
|
@ -34,15 +34,15 @@ StoragePtr ITableFunction::execute(const ASTPtr & ast_function, ContextPtr conte
|
||||
auto context_to_use = use_global_context ? context->getGlobalContext() : context;
|
||||
|
||||
if (cached_columns.empty())
|
||||
return executeImpl(ast_function, context, table_name, std::move(cached_columns));
|
||||
return executeImpl(ast_function, context, table_name, std::move(cached_columns), is_insert_query);
|
||||
|
||||
if (hasStaticStructure() && cached_columns == getActualTableStructure(context))
|
||||
return executeImpl(ast_function, context_to_use, table_name, std::move(cached_columns));
|
||||
if (hasStaticStructure() && cached_columns == getActualTableStructure(context,is_insert_query))
|
||||
return executeImpl(ast_function, context_to_use, table_name, std::move(cached_columns), is_insert_query);
|
||||
|
||||
auto this_table_function = shared_from_this();
|
||||
auto get_storage = [=]() -> StoragePtr
|
||||
{
|
||||
return this_table_function->executeImpl(ast_function, context_to_use, table_name, cached_columns);
|
||||
return this_table_function->executeImpl(ast_function, context_to_use, table_name, cached_columns, is_insert_query);
|
||||
};
|
||||
|
||||
/// It will request actual table structure and create underlying storage lazily
|
||||
|
@ -58,7 +58,7 @@ public:
|
||||
virtual void parseArguments(const ASTPtr & /*ast_function*/, ContextPtr /*context*/) {}
|
||||
|
||||
/// Returns actual table structure probably requested from remote server, may fail
|
||||
virtual ColumnsDescription getActualTableStructure(ContextPtr /*context*/) const = 0;
|
||||
virtual ColumnsDescription getActualTableStructure(ContextPtr /*context*/, bool is_insert_query) const = 0;
|
||||
|
||||
/// Check if table function needs a structure hint from SELECT query in case of
|
||||
/// INSERT INTO FUNCTION ... SELECT ... and INSERT INTO ... SELECT ... FROM table_function(...)
|
||||
@ -89,7 +89,7 @@ protected:
|
||||
|
||||
private:
|
||||
virtual StoragePtr executeImpl(
|
||||
const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns) const = 0;
|
||||
const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns, bool is_insert_query) const = 0;
|
||||
|
||||
virtual const char * getStorageTypeName() const = 0;
|
||||
};
|
||||
|
@ -26,7 +26,8 @@ protected:
|
||||
const ASTPtr & /*ast_function*/,
|
||||
ContextPtr context,
|
||||
const std::string & table_name,
|
||||
ColumnsDescription /*cached_columns*/) const override
|
||||
ColumnsDescription /*cached_columns*/,
|
||||
bool /*is_insert_query*/) const override
|
||||
{
|
||||
ColumnsDescription columns;
|
||||
if (TableFunction::configuration.structure != "auto")
|
||||
@ -42,7 +43,7 @@ protected:
|
||||
|
||||
const char * getStorageTypeName() const override { return Storage::name; }
|
||||
|
||||
ColumnsDescription getActualTableStructure(ContextPtr context) const override
|
||||
ColumnsDescription getActualTableStructure(ContextPtr context, bool /*is_insert_query*/) const override
|
||||
{
|
||||
if (TableFunction::configuration.structure == "auto")
|
||||
{
|
||||
|
@ -110,7 +110,7 @@ void ITableFunctionFileLike::addColumnsStructureToArguments(ASTs & args, const S
|
||||
}
|
||||
}
|
||||
|
||||
StoragePtr ITableFunctionFileLike::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const
|
||||
StoragePtr ITableFunctionFileLike::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/, bool /*is_insert_query*/) const
|
||||
{
|
||||
ColumnsDescription columns;
|
||||
if (structure != "auto")
|
||||
|
@ -48,7 +48,7 @@ protected:
|
||||
ColumnsDescription structure_hint;
|
||||
|
||||
private:
|
||||
StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns) const override;
|
||||
StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns, bool is_insert_query) const override;
|
||||
|
||||
virtual StoragePtr getStorage(
|
||||
const String & source, const String & format, const ColumnsDescription & columns, ContextPtr global_context,
|
||||
|
@ -61,7 +61,7 @@ void ITableFunctionXDBC::startBridgeIfNot(ContextPtr context) const
|
||||
}
|
||||
}
|
||||
|
||||
ColumnsDescription ITableFunctionXDBC::getActualTableStructure(ContextPtr context) const
|
||||
ColumnsDescription ITableFunctionXDBC::getActualTableStructure(ContextPtr context, bool /*is_insert_query*/) const
|
||||
{
|
||||
startBridgeIfNot(context);
|
||||
|
||||
@ -92,10 +92,10 @@ ColumnsDescription ITableFunctionXDBC::getActualTableStructure(ContextPtr contex
|
||||
return ColumnsDescription{columns};
|
||||
}
|
||||
|
||||
StoragePtr ITableFunctionXDBC::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const
|
||||
StoragePtr ITableFunctionXDBC::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/, bool is_insert_query) const
|
||||
{
|
||||
startBridgeIfNot(context);
|
||||
auto columns = getActualTableStructure(context);
|
||||
auto columns = getActualTableStructure(context, is_insert_query);
|
||||
auto result = std::make_shared<StorageXDBC>(
|
||||
StorageID(getDatabaseName(), table_name), schema_name, remote_table_name, columns, ConstraintsDescription{}, String{}, context, helper);
|
||||
result->startup();
|
||||
|
@ -16,7 +16,7 @@ namespace DB
|
||||
class ITableFunctionXDBC : public ITableFunction
|
||||
{
|
||||
private:
|
||||
StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns) const override;
|
||||
StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns, bool is_insert_query) const override;
|
||||
|
||||
/* A factory method to create bridge helper, that will assist in remote interaction */
|
||||
virtual BridgeHelperPtr createBridgeHelper(ContextPtr context,
|
||||
@ -24,7 +24,7 @@ private:
|
||||
const std::string & connection_string_,
|
||||
bool use_connection_pooling_) const = 0;
|
||||
|
||||
ColumnsDescription getActualTableStructure(ContextPtr context) const override;
|
||||
ColumnsDescription getActualTableStructure(ContextPtr context, bool is_insert_query) const override;
|
||||
|
||||
void parseArguments(const ASTPtr & ast_function, ContextPtr context) override;
|
||||
|
||||
|
@ -39,7 +39,7 @@ namespace
|
||||
|
||||
bool isConnectionString(const std::string & candidate)
|
||||
{
|
||||
return candidate.starts_with("DefaultEndpointsProtocol");
|
||||
return !candidate.starts_with("http");
|
||||
}
|
||||
|
||||
}
|
||||
@ -193,12 +193,12 @@ void TableFunctionAzureBlobStorage::parseArguments(const ASTPtr & ast_function,
|
||||
configuration = parseArgumentsImpl(args, context);
|
||||
}
|
||||
|
||||
ColumnsDescription TableFunctionAzureBlobStorage::getActualTableStructure(ContextPtr context) const
|
||||
ColumnsDescription TableFunctionAzureBlobStorage::getActualTableStructure(ContextPtr context, bool is_insert_query) const
|
||||
{
|
||||
if (configuration.structure == "auto")
|
||||
{
|
||||
context->checkAccess(getSourceAccessType());
|
||||
auto client = StorageAzureBlob::createClient(configuration);
|
||||
auto client = StorageAzureBlob::createClient(configuration, !is_insert_query);
|
||||
auto settings = StorageAzureBlob::createSettings(context);
|
||||
|
||||
auto object_storage = std::make_unique<AzureObjectStorage>("AzureBlobStorageTableFunction", std::move(client), std::move(settings));
|
||||
@ -213,9 +213,9 @@ bool TableFunctionAzureBlobStorage::supportsReadingSubsetOfColumns()
|
||||
return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(configuration.format);
|
||||
}
|
||||
|
||||
StoragePtr TableFunctionAzureBlobStorage::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const
|
||||
StoragePtr TableFunctionAzureBlobStorage::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/, bool is_insert_query) const
|
||||
{
|
||||
auto client = StorageAzureBlob::createClient(configuration);
|
||||
auto client = StorageAzureBlob::createClient(configuration, !is_insert_query);
|
||||
auto settings = StorageAzureBlob::createSettings(context);
|
||||
|
||||
ColumnsDescription columns;
|
||||
|
@ -54,11 +54,12 @@ protected:
|
||||
const ASTPtr & ast_function,
|
||||
ContextPtr context,
|
||||
const std::string & table_name,
|
||||
ColumnsDescription cached_columns) const override;
|
||||
ColumnsDescription cached_columns,
|
||||
bool is_insert_query) const override;
|
||||
|
||||
const char * getStorageTypeName() const override { return "Azure"; }
|
||||
|
||||
ColumnsDescription getActualTableStructure(ContextPtr context) const override;
|
||||
ColumnsDescription getActualTableStructure(ContextPtr context, bool is_insert_query) const override;
|
||||
void parseArguments(const ASTPtr & ast_function, ContextPtr context) override;
|
||||
|
||||
mutable StorageAzureBlob::Configuration configuration;
|
||||
|
@ -43,7 +43,7 @@ void TableFunctionDictionary::parseArguments(const ASTPtr & ast_function, Contex
|
||||
dictionary_name = checkAndGetLiteralArgument<String>(args[0], "dictionary_name");
|
||||
}
|
||||
|
||||
ColumnsDescription TableFunctionDictionary::getActualTableStructure(ContextPtr context) const
|
||||
ColumnsDescription TableFunctionDictionary::getActualTableStructure(ContextPtr context, bool /*is_insert_query*/) const
|
||||
{
|
||||
const ExternalDictionariesLoader & external_loader = context->getExternalDictionariesLoader();
|
||||
std::string resolved_name = external_loader.resolveDictionaryName(dictionary_name, context->getCurrentDatabase());
|
||||
@ -76,10 +76,10 @@ ColumnsDescription TableFunctionDictionary::getActualTableStructure(ContextPtr c
|
||||
}
|
||||
|
||||
StoragePtr TableFunctionDictionary::executeImpl(
|
||||
const ASTPtr &, ContextPtr context, const std::string & table_name, ColumnsDescription) const
|
||||
const ASTPtr &, ContextPtr context, const std::string & table_name, ColumnsDescription, bool is_insert_query) const
|
||||
{
|
||||
StorageID dict_id(getDatabaseName(), table_name);
|
||||
auto dictionary_table_structure = getActualTableStructure(context);
|
||||
auto dictionary_table_structure = getActualTableStructure(context, is_insert_query);
|
||||
|
||||
auto result = std::make_shared<StorageDictionary>(
|
||||
dict_id, dictionary_name, std::move(dictionary_table_structure), String{}, StorageDictionary::Location::Custom, context);
|
||||
|
@ -18,9 +18,9 @@ public:
|
||||
|
||||
void parseArguments(const ASTPtr & ast_function, ContextPtr context) override;
|
||||
|
||||
ColumnsDescription getActualTableStructure(ContextPtr context) const override;
|
||||
ColumnsDescription getActualTableStructure(ContextPtr context, bool is_insert_query) const override;
|
||||
|
||||
StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription) const override;
|
||||
StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription, bool is_insert_query) const override;
|
||||
|
||||
const char * getStorageTypeName() const override { return "Dictionary"; }
|
||||
|
||||
|
@ -120,12 +120,12 @@ void TableFunctionExecutable::parseArguments(const ASTPtr & ast_function, Contex
|
||||
}
|
||||
}
|
||||
|
||||
ColumnsDescription TableFunctionExecutable::getActualTableStructure(ContextPtr context) const
|
||||
ColumnsDescription TableFunctionExecutable::getActualTableStructure(ContextPtr context, bool /*is_insert_query*/) const
|
||||
{
|
||||
return parseColumnsListFromString(structure, context);
|
||||
}
|
||||
|
||||
StoragePtr TableFunctionExecutable::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const
|
||||
StoragePtr TableFunctionExecutable::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/, bool is_insert_query) const
|
||||
{
|
||||
auto storage_id = StorageID(getDatabaseName(), table_name);
|
||||
auto global_context = context->getGlobalContext();
|
||||
@ -135,7 +135,7 @@ StoragePtr TableFunctionExecutable::executeImpl(const ASTPtr & /*ast_function*/,
|
||||
if (settings_query != nullptr)
|
||||
settings.applyChanges(settings_query->as<ASTSetQuery>()->changes);
|
||||
|
||||
auto storage = std::make_shared<StorageExecutable>(storage_id, format, settings, input_queries, getActualTableStructure(context), ConstraintsDescription{});
|
||||
auto storage = std::make_shared<StorageExecutable>(storage_id, format, settings, input_queries, getActualTableStructure(context, is_insert_query), ConstraintsDescription{});
|
||||
storage->startup();
|
||||
return storage;
|
||||
}
|
||||
|
@ -24,11 +24,11 @@ public:
|
||||
bool hasStaticStructure() const override { return true; }
|
||||
|
||||
private:
|
||||
StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns) const override;
|
||||
StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns, bool is_insert_query) const override;
|
||||
|
||||
const char * getStorageTypeName() const override { return "Executable"; }
|
||||
|
||||
ColumnsDescription getActualTableStructure(ContextPtr context) const override;
|
||||
ColumnsDescription getActualTableStructure(ContextPtr context, bool is_insert_query) const override;
|
||||
|
||||
std::vector<size_t> skipAnalysisForArguments(const QueryTreeNodePtr & query_node_table_function, ContextPtr context) const override;
|
||||
|
||||
|
@ -91,7 +91,7 @@ void TableFunctionExplain::parseArguments(const ASTPtr & ast_function, ContextPt
|
||||
query = std::move(explain_query);
|
||||
}
|
||||
|
||||
ColumnsDescription TableFunctionExplain::getActualTableStructure(ContextPtr context) const
|
||||
ColumnsDescription TableFunctionExplain::getActualTableStructure(ContextPtr context, bool /*is_insert_query*/) const
|
||||
{
|
||||
Block sample_block = getInterpreter(context).getSampleBlock(query->as<ASTExplainQuery>()->getKind());
|
||||
ColumnsDescription columns_description;
|
||||
@ -123,7 +123,7 @@ static Block executeMonoBlock(QueryPipeline & pipeline)
|
||||
}
|
||||
|
||||
StoragePtr TableFunctionExplain::executeImpl(
|
||||
const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const
|
||||
const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/, bool is_insert_query) const
|
||||
{
|
||||
/// To support settings inside explain subquery.
|
||||
auto mutable_context = Context::createCopy(context);
|
||||
@ -132,7 +132,7 @@ StoragePtr TableFunctionExplain::executeImpl(
|
||||
Block block = executeMonoBlock(blockio.pipeline);
|
||||
|
||||
StorageID storage_id(getDatabaseName(), table_name);
|
||||
auto storage = std::make_shared<StorageValues>(storage_id, getActualTableStructure(context), std::move(block));
|
||||
auto storage = std::make_shared<StorageValues>(storage_id, getActualTableStructure(context, is_insert_query), std::move(block));
|
||||
storage->startup();
|
||||
return storage;
|
||||
}
|
||||
|
@ -17,7 +17,7 @@ public:
|
||||
std::string getName() const override { return name; }
|
||||
|
||||
private:
|
||||
StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const String & table_name, ColumnsDescription cached_columns) const override;
|
||||
StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const String & table_name, ColumnsDescription cached_columns, bool is_insert_query) const override;
|
||||
|
||||
const char * getStorageTypeName() const override { return "Explain"; }
|
||||
|
||||
@ -25,7 +25,7 @@ private:
|
||||
|
||||
void parseArguments(const ASTPtr & ast_function, ContextPtr context) override;
|
||||
|
||||
ColumnsDescription getActualTableStructure(ContextPtr context) const override;
|
||||
ColumnsDescription getActualTableStructure(ContextPtr context, bool is_insert_query) const override;
|
||||
|
||||
InterpreterExplainQuery getInterpreter(ContextPtr context) const;
|
||||
|
||||
|
@ -83,7 +83,7 @@ StoragePtr TableFunctionFile::getStorage(const String & source,
|
||||
return std::make_shared<StorageFile>(source, global_context->getUserFilesPath(), args);
|
||||
}
|
||||
|
||||
ColumnsDescription TableFunctionFile::getActualTableStructure(ContextPtr context) const
|
||||
ColumnsDescription TableFunctionFile::getActualTableStructure(ContextPtr context, bool /*is_insert_query*/) const
|
||||
{
|
||||
if (structure == "auto")
|
||||
{
|
||||
|
@ -20,7 +20,7 @@ public:
|
||||
return name;
|
||||
}
|
||||
|
||||
ColumnsDescription getActualTableStructure(ContextPtr context) const override;
|
||||
ColumnsDescription getActualTableStructure(ContextPtr context, bool is_insert_query) const override;
|
||||
|
||||
std::unordered_set<String> getVirtualsToCheckBeforeUsingStructureHint() const override
|
||||
{
|
||||
|
@ -52,7 +52,7 @@ void TableFunctionFormat::parseArguments(const ASTPtr & ast_function, ContextPtr
|
||||
structure = checkAndGetLiteralArgument<String>(args[1], "structure");
|
||||
}
|
||||
|
||||
ColumnsDescription TableFunctionFormat::getActualTableStructure(ContextPtr context) const
|
||||
ColumnsDescription TableFunctionFormat::getActualTableStructure(ContextPtr context, bool /*is_insert_query*/) const
|
||||
{
|
||||
if (structure == "auto")
|
||||
{
|
||||
@ -98,9 +98,9 @@ Block TableFunctionFormat::parseData(ColumnsDescription columns, ContextPtr cont
|
||||
return concatenateBlocks(blocks);
|
||||
}
|
||||
|
||||
StoragePtr TableFunctionFormat::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const
|
||||
StoragePtr TableFunctionFormat::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/, bool is_insert_query) const
|
||||
{
|
||||
auto columns = getActualTableStructure(context);
|
||||
auto columns = getActualTableStructure(context, is_insert_query);
|
||||
Block res_block = parseData(columns, context);
|
||||
auto res = std::make_shared<StorageValues>(StorageID(getDatabaseName(), table_name), columns, res_block);
|
||||
res->startup();
|
||||
|
@ -18,10 +18,10 @@ public:
|
||||
bool hasStaticStructure() const override { return false; }
|
||||
|
||||
private:
|
||||
StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns) const override;
|
||||
StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns, bool is_insert_query) const override;
|
||||
const char * getStorageTypeName() const override { return "Values"; }
|
||||
|
||||
ColumnsDescription getActualTableStructure(ContextPtr context) const override;
|
||||
ColumnsDescription getActualTableStructure(ContextPtr context, bool is_insert_query) const override;
|
||||
void parseArguments(const ASTPtr & ast_function, ContextPtr context) override;
|
||||
|
||||
Block parseData(ColumnsDescription columns, ContextPtr context) const;
|
||||
|
@ -97,7 +97,7 @@ void TableFunctionGenerateRandom::parseArguments(const ASTPtr & ast_function, Co
|
||||
}
|
||||
}
|
||||
|
||||
ColumnsDescription TableFunctionGenerateRandom::getActualTableStructure(ContextPtr context) const
|
||||
ColumnsDescription TableFunctionGenerateRandom::getActualTableStructure(ContextPtr context, bool /*is_insert_query*/) const
|
||||
{
|
||||
if (structure == "auto")
|
||||
{
|
||||
@ -113,9 +113,9 @@ ColumnsDescription TableFunctionGenerateRandom::getActualTableStructure(ContextP
|
||||
return parseColumnsListFromString(structure, context);
|
||||
}
|
||||
|
||||
StoragePtr TableFunctionGenerateRandom::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const
|
||||
StoragePtr TableFunctionGenerateRandom::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/, bool is_insert_query) const
|
||||
{
|
||||
ColumnsDescription columns = getActualTableStructure(context);
|
||||
ColumnsDescription columns = getActualTableStructure(context, is_insert_query);
|
||||
auto res = std::make_shared<StorageGenerateRandom>(
|
||||
StorageID(getDatabaseName(), table_name), columns, String{}, max_array_length, max_string_length, random_seed);
|
||||
res->startup();
|
||||
|
@ -19,10 +19,10 @@ public:
|
||||
void setStructureHint(const ColumnsDescription & structure_hint_) override { structure_hint = structure_hint_; }
|
||||
|
||||
private:
|
||||
StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns) const override;
|
||||
StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns, bool is_insert_query) const override;
|
||||
const char * getStorageTypeName() const override { return "GenerateRandom"; }
|
||||
|
||||
ColumnsDescription getActualTableStructure(ContextPtr context) const override;
|
||||
ColumnsDescription getActualTableStructure(ContextPtr context, bool is_insert_query) const override;
|
||||
void parseArguments(const ASTPtr & ast_function, ContextPtr context) override;
|
||||
|
||||
String structure = "auto";
|
||||
|
@ -28,7 +28,7 @@ StoragePtr TableFunctionHDFS::getStorage(
|
||||
compression_method_);
|
||||
}
|
||||
|
||||
ColumnsDescription TableFunctionHDFS::getActualTableStructure(ContextPtr context) const
|
||||
ColumnsDescription TableFunctionHDFS::getActualTableStructure(ContextPtr context, bool /*is_insert_query*/) const
|
||||
{
|
||||
if (structure == "auto")
|
||||
{
|
||||
|
@ -34,7 +34,7 @@ public:
|
||||
return signature;
|
||||
}
|
||||
|
||||
ColumnsDescription getActualTableStructure(ContextPtr context) const override;
|
||||
ColumnsDescription getActualTableStructure(ContextPtr context, bool is_insert_query) const override;
|
||||
|
||||
std::unordered_set<String> getVirtualsToCheckBeforeUsingStructureHint() const override
|
||||
{
|
||||
|
@ -43,7 +43,7 @@ void TableFunctionInput::parseArguments(const ASTPtr & ast_function, ContextPtr
|
||||
structure = checkAndGetLiteralArgument<String>(evaluateConstantExpressionOrIdentifierAsLiteral(args[0], context), "structure");
|
||||
}
|
||||
|
||||
ColumnsDescription TableFunctionInput::getActualTableStructure(ContextPtr context) const
|
||||
ColumnsDescription TableFunctionInput::getActualTableStructure(ContextPtr context, bool /*is_insert_query*/) const
|
||||
{
|
||||
if (structure == "auto")
|
||||
{
|
||||
@ -58,9 +58,9 @@ ColumnsDescription TableFunctionInput::getActualTableStructure(ContextPtr contex
|
||||
return parseColumnsListFromString(structure, context);
|
||||
}
|
||||
|
||||
StoragePtr TableFunctionInput::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const
|
||||
StoragePtr TableFunctionInput::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/, bool is_insert_query) const
|
||||
{
|
||||
auto storage = std::make_shared<StorageInput>(StorageID(getDatabaseName(), table_name), getActualTableStructure(context));
|
||||
auto storage = std::make_shared<StorageInput>(StorageID(getDatabaseName(), table_name), getActualTableStructure(context, is_insert_query));
|
||||
storage->startup();
|
||||
return storage;
|
||||
}
|
||||
|
@ -20,10 +20,10 @@ public:
|
||||
void setStructureHint(const ColumnsDescription & structure_hint_) override { structure_hint = structure_hint_; }
|
||||
|
||||
private:
|
||||
StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns) const override;
|
||||
StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns, bool is_insert_query) const override;
|
||||
const char * getStorageTypeName() const override { return "Input"; }
|
||||
|
||||
ColumnsDescription getActualTableStructure(ContextPtr context) const override;
|
||||
ColumnsDescription getActualTableStructure(ContextPtr context, bool is_insert_query) const override;
|
||||
void parseArguments(const ASTPtr & ast_function, ContextPtr context) override;
|
||||
|
||||
String structure;
|
||||
|
@ -8,13 +8,13 @@
|
||||
namespace DB
|
||||
{
|
||||
StoragePtr TableFunctionMeiliSearch::executeImpl(
|
||||
const ASTPtr & /* ast_function */, ContextPtr /*context*/, const String & table_name, ColumnsDescription /*cached_columns*/) const
|
||||
const ASTPtr & /* ast_function */, ContextPtr /*context*/, const String & table_name, ColumnsDescription /*cached_columns*/, bool /*is_insert_query*/) const
|
||||
{
|
||||
return std::make_shared<StorageMeiliSearch>(
|
||||
StorageID(getDatabaseName(), table_name), configuration.value(), ColumnsDescription{}, ConstraintsDescription{}, String{});
|
||||
}
|
||||
|
||||
ColumnsDescription TableFunctionMeiliSearch::getActualTableStructure(ContextPtr /* context */) const
|
||||
ColumnsDescription TableFunctionMeiliSearch::getActualTableStructure(ContextPtr /* context */, bool /*is_insert_query*/) const
|
||||
{
|
||||
return StorageMeiliSearch::getTableStructureFromData(configuration.value());
|
||||
}
|
||||
|
@ -13,11 +13,11 @@ public:
|
||||
|
||||
private:
|
||||
StoragePtr executeImpl(
|
||||
const ASTPtr & ast_function, ContextPtr context, const String & table_name, ColumnsDescription cached_columns) const override;
|
||||
const ASTPtr & ast_function, ContextPtr context, const String & table_name, ColumnsDescription cached_columns, bool is_insert_query) const override;
|
||||
|
||||
const char * getStorageTypeName() const override { return "meilisearch"; }
|
||||
|
||||
ColumnsDescription getActualTableStructure(ContextPtr context) const override;
|
||||
ColumnsDescription getActualTableStructure(ContextPtr context, bool is_insert_query) const override;
|
||||
void parseArguments(const ASTPtr & ast_function, ContextPtr context) override;
|
||||
|
||||
std::optional<MeiliSearchConfiguration> configuration;
|
||||
|
@ -118,7 +118,7 @@ const TableFunctionMerge::DBToTableSetMap & TableFunctionMerge::getSourceDatabas
|
||||
return *source_databases_and_tables;
|
||||
}
|
||||
|
||||
ColumnsDescription TableFunctionMerge::getActualTableStructure(ContextPtr context) const
|
||||
ColumnsDescription TableFunctionMerge::getActualTableStructure(ContextPtr context, bool /*is_insert_query*/) const
|
||||
{
|
||||
for (const auto & db_with_tables : getSourceDatabasesAndTables(context))
|
||||
{
|
||||
@ -134,11 +134,11 @@ ColumnsDescription TableFunctionMerge::getActualTableStructure(ContextPtr contex
|
||||
}
|
||||
|
||||
|
||||
StoragePtr TableFunctionMerge::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const
|
||||
StoragePtr TableFunctionMerge::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/, bool is_insert_query) const
|
||||
{
|
||||
auto res = std::make_shared<StorageMerge>(
|
||||
StorageID(getDatabaseName(), table_name),
|
||||
getActualTableStructure(context),
|
||||
getActualTableStructure(context, is_insert_query),
|
||||
String{},
|
||||
source_database_name_or_regexp,
|
||||
database_is_regexp,
|
||||
|
@ -17,13 +17,13 @@ public:
|
||||
std::string getName() const override { return name; }
|
||||
|
||||
private:
|
||||
StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns) const override;
|
||||
StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns, bool is_insert_query) const override;
|
||||
const char * getStorageTypeName() const override { return "Merge"; }
|
||||
|
||||
using TableSet = std::set<String>;
|
||||
using DBToTableSetMap = std::map<String, TableSet>;
|
||||
const DBToTableSetMap & getSourceDatabasesAndTables(ContextPtr context) const;
|
||||
ColumnsDescription getActualTableStructure(ContextPtr context) const override;
|
||||
ColumnsDescription getActualTableStructure(ContextPtr context, bool is_insert_query) const override;
|
||||
std::vector<size_t> skipAnalysisForArguments(const QueryTreeNodePtr & query_node_table_function, ContextPtr context) const override;
|
||||
void parseArguments(const ASTPtr & ast_function, ContextPtr context) override;
|
||||
static TableSet getMatchedTablesWithAccess(const String & database_name, const String & table_regexp, const ContextPtr & context);
|
||||
|
@ -27,9 +27,9 @@ namespace ErrorCodes
|
||||
|
||||
|
||||
StoragePtr TableFunctionMongoDB::executeImpl(const ASTPtr & /*ast_function*/,
|
||||
ContextPtr context, const String & table_name, ColumnsDescription /*cached_columns*/) const
|
||||
ContextPtr context, const String & table_name, ColumnsDescription /*cached_columns*/, bool is_insert_query) const
|
||||
{
|
||||
auto columns = getActualTableStructure(context);
|
||||
auto columns = getActualTableStructure(context, is_insert_query);
|
||||
auto storage = std::make_shared<StorageMongoDB>(
|
||||
StorageID(configuration->database, table_name),
|
||||
configuration->host,
|
||||
@ -46,7 +46,7 @@ StoragePtr TableFunctionMongoDB::executeImpl(const ASTPtr & /*ast_function*/,
|
||||
return storage;
|
||||
}
|
||||
|
||||
ColumnsDescription TableFunctionMongoDB::getActualTableStructure(ContextPtr context) const
|
||||
ColumnsDescription TableFunctionMongoDB::getActualTableStructure(ContextPtr context, bool /*is_insert_query*/) const
|
||||
{
|
||||
return parseColumnsListFromString(structure, context);
|
||||
}
|
||||
|
@ -17,11 +17,11 @@ public:
|
||||
private:
|
||||
StoragePtr executeImpl(
|
||||
const ASTPtr & ast_function, ContextPtr context,
|
||||
const std::string & table_name, ColumnsDescription cached_columns) const override;
|
||||
const std::string & table_name, ColumnsDescription cached_columns, bool is_insert_query) const override;
|
||||
|
||||
const char * getStorageTypeName() const override { return "MongoDB"; }
|
||||
|
||||
ColumnsDescription getActualTableStructure(ContextPtr context) const override;
|
||||
ColumnsDescription getActualTableStructure(ContextPtr context, bool is_insert_query) const override;
|
||||
void parseArguments(const ASTPtr & ast_function, ContextPtr context) override;
|
||||
|
||||
std::optional<StorageMongoDB::Configuration> configuration;
|
||||
|
@ -57,7 +57,7 @@ void TableFunctionMySQL::parseArguments(const ASTPtr & ast_function, ContextPtr
|
||||
pool.emplace(createMySQLPoolWithFailover(*configuration, mysql_settings));
|
||||
}
|
||||
|
||||
ColumnsDescription TableFunctionMySQL::getActualTableStructure(ContextPtr context) const
|
||||
ColumnsDescription TableFunctionMySQL::getActualTableStructure(ContextPtr context, bool /*is_insert_query*/) const
|
||||
{
|
||||
return StorageMySQL::getTableStructureFromData(*pool, configuration->database, configuration->table, context);
|
||||
}
|
||||
@ -66,7 +66,8 @@ StoragePtr TableFunctionMySQL::executeImpl(
|
||||
const ASTPtr & /*ast_function*/,
|
||||
ContextPtr context,
|
||||
const std::string & table_name,
|
||||
ColumnsDescription /*cached_columns*/) const
|
||||
ColumnsDescription /*cached_columns*/,
|
||||
bool /*is_insert_query*/) const
|
||||
{
|
||||
auto res = std::make_shared<StorageMySQL>(
|
||||
StorageID(getDatabaseName(), table_name),
|
||||
|
@ -23,10 +23,10 @@ public:
|
||||
return name;
|
||||
}
|
||||
private:
|
||||
StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns) const override;
|
||||
StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns, bool is_insert_query) const override;
|
||||
const char * getStorageTypeName() const override { return "MySQL"; }
|
||||
|
||||
ColumnsDescription getActualTableStructure(ContextPtr context) const override;
|
||||
ColumnsDescription getActualTableStructure(ContextPtr context, bool is_insert_query) const override;
|
||||
void parseArguments(const ASTPtr & ast_function, ContextPtr context) override;
|
||||
|
||||
mutable std::optional<mysqlxx::PoolWithFailover> pool;
|
||||
|
@ -32,14 +32,14 @@ void TableFunctionNull::parseArguments(const ASTPtr & ast_function, ContextPtr c
|
||||
structure = checkAndGetLiteralArgument<String>(evaluateConstantExpressionOrIdentifierAsLiteral(arguments[0], context), "structure");
|
||||
}
|
||||
|
||||
ColumnsDescription TableFunctionNull::getActualTableStructure(ContextPtr context) const
|
||||
ColumnsDescription TableFunctionNull::getActualTableStructure(ContextPtr context, bool /*is_insert_query*/) const
|
||||
{
|
||||
if (structure != "auto")
|
||||
return parseColumnsListFromString(structure, context);
|
||||
return default_structure;
|
||||
}
|
||||
|
||||
StoragePtr TableFunctionNull::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const
|
||||
StoragePtr TableFunctionNull::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/, bool /*is_insert_query*/) const
|
||||
{
|
||||
ColumnsDescription columns;
|
||||
if (structure != "auto")
|
||||
|
@ -23,11 +23,11 @@ public:
|
||||
void setStructureHint(const ColumnsDescription & structure_hint_) override { structure_hint = structure_hint_; }
|
||||
|
||||
private:
|
||||
StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const String & table_name, ColumnsDescription cached_columns) const override;
|
||||
StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const String & table_name, ColumnsDescription cached_columns, bool is_insert_query) const override;
|
||||
const char * getStorageTypeName() const override { return "Null"; }
|
||||
|
||||
void parseArguments(const ASTPtr & ast_function, ContextPtr context) override;
|
||||
ColumnsDescription getActualTableStructure(ContextPtr context) const override;
|
||||
ColumnsDescription getActualTableStructure(ContextPtr context, bool is_insert_query) const override;
|
||||
|
||||
String structure = "auto";
|
||||
ColumnsDescription structure_hint;
|
||||
|
@ -23,14 +23,14 @@ namespace ErrorCodes
|
||||
|
||||
|
||||
template <bool multithreaded>
|
||||
ColumnsDescription TableFunctionNumbers<multithreaded>::getActualTableStructure(ContextPtr /*context*/) const
|
||||
ColumnsDescription TableFunctionNumbers<multithreaded>::getActualTableStructure(ContextPtr /*context*/, bool /*is_insert_query*/) const
|
||||
{
|
||||
/// NOTE: https://bugs.llvm.org/show_bug.cgi?id=47418
|
||||
return ColumnsDescription{{{"number", std::make_shared<DataTypeUInt64>()}}};
|
||||
}
|
||||
|
||||
template <bool multithreaded>
|
||||
StoragePtr TableFunctionNumbers<multithreaded>::executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const
|
||||
StoragePtr TableFunctionNumbers<multithreaded>::executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/, bool /*is_insert_query*/) const
|
||||
{
|
||||
if (const auto * function = ast_function->as<ASTFunction>())
|
||||
{
|
||||
|
@ -19,12 +19,12 @@ public:
|
||||
std::string getName() const override { return name; }
|
||||
bool hasStaticStructure() const override { return true; }
|
||||
private:
|
||||
StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns) const override;
|
||||
StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns, bool is_insert_query) const override;
|
||||
const char * getStorageTypeName() const override { return "SystemNumbers"; }
|
||||
|
||||
UInt64 evaluateArgument(ContextPtr context, ASTPtr & argument) const;
|
||||
|
||||
ColumnsDescription getActualTableStructure(ContextPtr context) const override;
|
||||
ColumnsDescription getActualTableStructure(ContextPtr context, bool is_insert_query) const override;
|
||||
};
|
||||
|
||||
|
||||
|
@ -20,7 +20,7 @@ namespace ErrorCodes
|
||||
|
||||
|
||||
StoragePtr TableFunctionPostgreSQL::executeImpl(const ASTPtr & /*ast_function*/,
|
||||
ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const
|
||||
ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/, bool /*is_insert_query*/) const
|
||||
{
|
||||
auto result = std::make_shared<StoragePostgreSQL>(
|
||||
StorageID(getDatabaseName(), table_name),
|
||||
@ -38,7 +38,7 @@ StoragePtr TableFunctionPostgreSQL::executeImpl(const ASTPtr & /*ast_function*/,
|
||||
}
|
||||
|
||||
|
||||
ColumnsDescription TableFunctionPostgreSQL::getActualTableStructure(ContextPtr context) const
|
||||
ColumnsDescription TableFunctionPostgreSQL::getActualTableStructure(ContextPtr context, bool /*is_insert_query*/) const
|
||||
{
|
||||
return StoragePostgreSQL::getTableStructureFromData(connection_pool, configuration->table, configuration->schema, context);
|
||||
}
|
||||
|
@ -20,11 +20,11 @@ public:
|
||||
private:
|
||||
StoragePtr executeImpl(
|
||||
const ASTPtr & ast_function, ContextPtr context,
|
||||
const std::string & table_name, ColumnsDescription cached_columns) const override;
|
||||
const std::string & table_name, ColumnsDescription cached_columns, bool is_insert_query) const override;
|
||||
|
||||
const char * getStorageTypeName() const override { return "PostgreSQL"; }
|
||||
|
||||
ColumnsDescription getActualTableStructure(ContextPtr context) const override;
|
||||
ColumnsDescription getActualTableStructure(ContextPtr context, bool is_insert_query) const override;
|
||||
void parseArguments(const ASTPtr & ast_function, ContextPtr context) override;
|
||||
|
||||
postgres::PoolWithFailoverPtr connection_pool;
|
||||
|
@ -25,9 +25,9 @@ namespace ErrorCodes
|
||||
}
|
||||
|
||||
StoragePtr TableFunctionRedis::executeImpl(
|
||||
const ASTPtr & /*ast_function*/, ContextPtr context, const String & table_name, ColumnsDescription /*cached_columns*/) const
|
||||
const ASTPtr & /*ast_function*/, ContextPtr context, const String & table_name, ColumnsDescription /*cached_columns*/, bool is_insert_query) const
|
||||
{
|
||||
auto columns = getActualTableStructure(context);
|
||||
auto columns = getActualTableStructure(context, is_insert_query);
|
||||
|
||||
StorageInMemoryMetadata metadata;
|
||||
metadata.setColumns(columns);
|
||||
@ -39,7 +39,7 @@ StoragePtr TableFunctionRedis::executeImpl(
|
||||
return storage;
|
||||
}
|
||||
|
||||
ColumnsDescription TableFunctionRedis::getActualTableStructure(ContextPtr context) const
|
||||
ColumnsDescription TableFunctionRedis::getActualTableStructure(ContextPtr context, bool /*is_insert_query*/) const
|
||||
{
|
||||
return parseColumnsListFromString(structure, context);
|
||||
}
|
||||
|
@ -19,11 +19,11 @@ public:
|
||||
private:
|
||||
StoragePtr executeImpl(
|
||||
const ASTPtr & ast_function, ContextPtr context,
|
||||
const String & table_name, ColumnsDescription cached_columns) const override;
|
||||
const String & table_name, ColumnsDescription cached_columns, bool is_insert_query) const override;
|
||||
|
||||
const char * getStorageTypeName() const override { return "Redis"; }
|
||||
|
||||
ColumnsDescription getActualTableStructure(ContextPtr context) const override;
|
||||
ColumnsDescription getActualTableStructure(ContextPtr context, bool is_insert_query) const override;
|
||||
void parseArguments(const ASTPtr & ast_function, ContextPtr context) override;
|
||||
|
||||
RedisConfiguration configuration;
|
||||
|
@ -276,12 +276,12 @@ void TableFunctionRemote::parseArguments(const ASTPtr & ast_function, ContextPtr
|
||||
remote_table_id.table_name = table;
|
||||
}
|
||||
|
||||
StoragePtr TableFunctionRemote::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns) const
|
||||
StoragePtr TableFunctionRemote::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns, bool is_insert_query) const
|
||||
{
|
||||
/// StorageDistributed supports mismatching structure of remote table, so we can use outdated structure for CREATE ... AS remote(...)
|
||||
/// without additional conversion in StorageTableFunctionProxy
|
||||
if (cached_columns.empty())
|
||||
cached_columns = getActualTableStructure(context);
|
||||
cached_columns = getActualTableStructure(context, is_insert_query);
|
||||
|
||||
assert(cluster);
|
||||
StoragePtr res = remote_table_function_ptr
|
||||
@ -318,7 +318,7 @@ StoragePtr TableFunctionRemote::executeImpl(const ASTPtr & /*ast_function*/, Con
|
||||
return res;
|
||||
}
|
||||
|
||||
ColumnsDescription TableFunctionRemote::getActualTableStructure(ContextPtr context) const
|
||||
ColumnsDescription TableFunctionRemote::getActualTableStructure(ContextPtr context, bool /*is_insert_query*/) const
|
||||
{
|
||||
assert(cluster);
|
||||
return getStructureOfRemoteTable(*cluster, remote_table_id, context, remote_table_function_ptr);
|
||||
|
@ -22,13 +22,13 @@ public:
|
||||
|
||||
std::string getName() const override { return name; }
|
||||
|
||||
ColumnsDescription getActualTableStructure(ContextPtr context) const override;
|
||||
ColumnsDescription getActualTableStructure(ContextPtr context, bool is_insert_query) const override;
|
||||
|
||||
bool needStructureConversion() const override { return false; }
|
||||
|
||||
private:
|
||||
|
||||
StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns) const override;
|
||||
StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns, bool is_insert_query) const override;
|
||||
const char * getStorageTypeName() const override { return "Distributed"; }
|
||||
|
||||
void parseArguments(const ASTPtr & ast_function, ContextPtr context) override;
|
||||
|
@ -313,7 +313,7 @@ void TableFunctionS3::addColumnsStructureToArguments(ASTs & args, const String &
|
||||
}
|
||||
}
|
||||
|
||||
ColumnsDescription TableFunctionS3::getActualTableStructure(ContextPtr context) const
|
||||
ColumnsDescription TableFunctionS3::getActualTableStructure(ContextPtr context, bool /*is_insert_query*/) const
|
||||
{
|
||||
if (configuration.structure == "auto")
|
||||
{
|
||||
@ -330,7 +330,7 @@ bool TableFunctionS3::supportsReadingSubsetOfColumns()
|
||||
return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(configuration.format);
|
||||
}
|
||||
|
||||
StoragePtr TableFunctionS3::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const
|
||||
StoragePtr TableFunctionS3::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/, bool /*is_insert_query*/) const
|
||||
{
|
||||
S3::URI s3_uri (configuration.url);
|
||||
|
||||
|
@ -64,11 +64,12 @@ protected:
|
||||
const ASTPtr & ast_function,
|
||||
ContextPtr context,
|
||||
const std::string & table_name,
|
||||
ColumnsDescription cached_columns) const override;
|
||||
ColumnsDescription cached_columns,
|
||||
bool is_insert_query) const override;
|
||||
|
||||
const char * getStorageTypeName() const override { return "S3"; }
|
||||
|
||||
ColumnsDescription getActualTableStructure(ContextPtr context) const override;
|
||||
ColumnsDescription getActualTableStructure(ContextPtr context, bool is_insert_query) const override;
|
||||
void parseArguments(const ASTPtr & ast_function, ContextPtr context) override;
|
||||
|
||||
mutable StorageS3::Configuration configuration;
|
||||
|
@ -17,7 +17,7 @@ namespace DB
|
||||
|
||||
StoragePtr TableFunctionS3Cluster::executeImpl(
|
||||
const ASTPtr & /*function*/, ContextPtr context,
|
||||
const std::string & table_name, ColumnsDescription /*cached_columns*/) const
|
||||
const std::string & table_name, ColumnsDescription /*cached_columns*/, bool /*is_insert_query*/) const
|
||||
{
|
||||
StoragePtr storage;
|
||||
ColumnsDescription columns;
|
||||
|
@ -52,7 +52,8 @@ protected:
|
||||
const ASTPtr & ast_function,
|
||||
ContextPtr context,
|
||||
const std::string & table_name,
|
||||
ColumnsDescription cached_columns) const override;
|
||||
ColumnsDescription cached_columns,
|
||||
bool is_insert_query) const override;
|
||||
|
||||
const char * getStorageTypeName() const override { return "S3Cluster"; }
|
||||
};
|
||||
|
@ -29,7 +29,7 @@ namespace ErrorCodes
|
||||
|
||||
|
||||
StoragePtr TableFunctionSQLite::executeImpl(const ASTPtr & /*ast_function*/,
|
||||
ContextPtr context, const String & table_name, ColumnsDescription /*cached_columns*/) const
|
||||
ContextPtr context, const String & table_name, ColumnsDescription /*cached_columns*/, bool /*is_insert_query*/) const
|
||||
{
|
||||
auto storage = std::make_shared<StorageSQLite>(StorageID(getDatabaseName(), table_name),
|
||||
sqlite_db,
|
||||
@ -42,7 +42,7 @@ StoragePtr TableFunctionSQLite::executeImpl(const ASTPtr & /*ast_function*/,
|
||||
}
|
||||
|
||||
|
||||
ColumnsDescription TableFunctionSQLite::getActualTableStructure(ContextPtr /* context */) const
|
||||
ColumnsDescription TableFunctionSQLite::getActualTableStructure(ContextPtr /* context */, bool /*is_insert_query*/) const
|
||||
{
|
||||
return StorageSQLite::getTableStructureFromData(sqlite_db, remote_table_name);
|
||||
}
|
||||
|
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in New Issue
Block a user