Merge branch 'master' into fix_bad_exception

This commit is contained in:
alesapin 2022-08-23 12:22:05 +02:00
commit 802d56df49
20 changed files with 237 additions and 72 deletions

2
contrib/NuRaft vendored

@ -1 +1 @@
Subproject commit 1b0af760b3506b8e35b50cb7df098cbad5064ff2
Subproject commit 33f60f961d4914441b684af43e9e5535078ba54b

View File

@ -15,11 +15,12 @@ CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
...
) ENGINE = EmbeddedRocksDB PRIMARY KEY(primary_key_name)
) ENGINE = EmbeddedRocksDB([ttl]) PRIMARY KEY(primary_key_name)
```
Required parameters:
Engine parameters:
- `ttl` - time to live for values. TTL is accepted in seconds. If TTL is 0, regular RocksDB instance is used (without TTL).
- `primary_key_name` any column name in the column list.
- `primary key` must be specified, it supports only one column in the primary key. The primary key will be serialized in binary as a `rocksdb key`.
- columns other than the primary key will be serialized in binary as `rocksdb` value in corresponding order.

View File

@ -1,6 +1,6 @@
---
sidebar_position: 69
sidebar_label: "Named connections"
sidebar_label: "Named collections"
---
# Storing details for connecting to external sources in configuration files
@ -12,7 +12,7 @@ from users with only SQL access.
Parameters can be set in XML `<format>CSV</format>` and overridden in SQL `, format = 'TSV'`.
The parameters in SQL can be overridden using format `key` = `value`: `compression_method = 'gzip'`.
Named connections are stored in the `config.xml` file of the ClickHouse server in the `<named_collections>` section and are applied when ClickHouse starts.
Named collections are stored in the `config.xml` file of the ClickHouse server in the `<named_collections>` section and are applied when ClickHouse starts.
Example of configuration:
```xml
@ -24,7 +24,7 @@ $ cat /etc/clickhouse-server/config.d/named_collections.xml
</clickhouse>
```
## Named connections for accessing S3.
## Named collections for accessing S3.
The description of parameters see [s3 Table Function](../sql-reference/table-functions/s3.md).
@ -42,7 +42,7 @@ Example of configuration:
</clickhouse>
```
### Example of using named connections with the s3 function
### Example of using named collections with the s3 function
```sql
INSERT INTO FUNCTION s3(s3_mydata, filename = 'test_file.tsv.gz',
@ -58,7 +58,7 @@ FROM s3(s3_mydata, filename = 'test_file.tsv.gz')
1 rows in set. Elapsed: 0.279 sec. Processed 10.00 thousand rows, 90.00 KB (35.78 thousand rows/s., 322.02 KB/s.)
```
### Example of using named connections with an S3 table
### Example of using named collections with an S3 table
```sql
CREATE TABLE s3_engine_table (number Int64)
@ -73,7 +73,7 @@ SELECT * FROM s3_engine_table LIMIT 3;
└────────┘
```
## Named connections for accessing MySQL database
## Named collections for accessing MySQL database
The description of parameters see [mysql](../sql-reference/table-functions/mysql.md).
@ -95,7 +95,7 @@ Example of configuration:
</clickhouse>
```
### Example of using named connections with the mysql function
### Example of using named collections with the mysql function
```sql
SELECT count() FROM mysql(mymysql, table = 'test');
@ -105,7 +105,7 @@ SELECT count() FROM mysql(mymysql, table = 'test');
└─────────┘
```
### Example of using named connections with an MySQL table
### Example of using named collections with an MySQL table
```sql
CREATE TABLE mytable(A Int64) ENGINE = MySQL(mymysql, table = 'test', connection_pool_size=3, replace_query=0);
@ -116,7 +116,7 @@ SELECT count() FROM mytable;
└─────────┘
```
### Example of using named connections with database with engine MySQL
### Example of using named collections with database with engine MySQL
```sql
CREATE DATABASE mydatabase ENGINE = MySQL(mymysql);
@ -129,7 +129,7 @@ SHOW TABLES FROM mydatabase;
└────────┘
```
### Example of using named connections with an external dictionary with source MySQL
### Example of using named collections with an external dictionary with source MySQL
```sql
CREATE DICTIONARY dict (A Int64, B String)
@ -145,7 +145,7 @@ SELECT dictGet('dict', 'B', 2);
└─────────────────────────┘
```
## Named connections for accessing PostgreSQL database
## Named collections for accessing PostgreSQL database
The description of parameters see [postgresql](../sql-reference/table-functions/postgresql.md).
@ -166,7 +166,7 @@ Example of configuration:
</clickhouse>
```
### Example of using named connections with the postgresql function
### Example of using named collections with the postgresql function
```sql
SELECT * FROM postgresql(mypg, table = 'test');
@ -186,8 +186,7 @@ SELECT * FROM postgresql(mypg, table = 'test', schema = 'public');
└───┘
```
### Example of using named connections with database with engine PostgreSQL
### Example of using named collections with database with engine PostgreSQL
```sql
CREATE TABLE mypgtable (a Int64) ENGINE = PostgreSQL(mypg, table = 'test', schema = 'public');
@ -201,7 +200,7 @@ SELECT * FROM mypgtable;
└───┘
```
### Example of using named connections with database with engine PostgreSQL
### Example of using named collections with database with engine PostgreSQL
```sql
CREATE DATABASE mydatabase ENGINE = PostgreSQL(mypg);
@ -213,7 +212,7 @@ SHOW TABLES FROM mydatabase
└──────┘
```
### Example of using named connections with an external dictionary with source POSTGRESQL
### Example of using named collections with an external dictionary with source POSTGRESQL
```sql
CREATE DICTIONARY dict (a Int64, b String)
@ -228,3 +227,59 @@ SELECT dictGet('dict', 'b', 2);
│ two │
└─────────────────────────┘
```
## Named collections for accessing remote ClickHouse database
The description of parameters see [remote](../sql-reference/table-functions/remote.md/#parameters).
Example of configuration:
```xml
<clickhouse>
<named_collections>
<remote1>
<host>localhost</host>
<port>9000</port>
<database>system</database>
<user>foo</user>
<password>secret</password>
</remote1>
</named_collections>
</clickhouse>
```
### Example of using named collections with the `remote`/`remoteSecure` functions
```sql
SELECT * FROM remote(remote1, table = one);
┌─dummy─┐
│ 0 │
└───────┘
SELECT * FROM remote(remote1, database = merge(system, '^one'));
┌─dummy─┐
│ 0 │
└───────┘
INSERT INTO FUNCTION remote(remote1, database = default, table = test) VALUES (1,'a');
SELECT * FROM remote(remote1, database = default, table = test);
┌─a─┬─b─┐
│ 1 │ a │
└───┴───┘
```
### Example of using named collections with an external dictionary with source ClickHouse
```sql
CREATE DICTIONARY dict(a Int64, b String)
PRIMARY KEY a
SOURCE(CLICKHOUSE(NAME remote1 TABLE test DB default))
LIFETIME(MIN 1 MAX 2)
LAYOUT(HASHED());
SELECT dictGet('dict', 'b', 1);
┌─dictGet('dict', 'b', 1)─┐
│ a │
└─────────────────────────┘
```

View File

@ -441,6 +441,8 @@ For more information, see the section “[Configuration files](../../operations/
## interserver_listen_host {#interserver-listen-host}
Restriction on hosts that can exchange data between ClickHouse servers.
If Keeper is used, the same restriction will be applied to the communication
between different Keeper instances.
The default value equals to `listen_host` setting.
Examples:

View File

@ -411,6 +411,8 @@ ClickHouse проверяет условия для `min_part_size` и `min_part
## interserver_listen_host {#interserver-listen-host}
Ограничение по хостам, для обмена между серверами ClickHouse.
Если используется Keeper, то такое же ограничение будет применяться к обмену данными
между различными экземплярами Keeper.
Значение по умолчанию совпадает со значением параметра listen_host
Примеры:

View File

@ -86,13 +86,10 @@ static void splitHostAndPort(const std::string & host_and_port, std::string & ou
static DNSResolver::IPAddresses hostByName(const std::string & host)
{
/// Family: AF_UNSPEC
/// AI_ALL is required for checking if client is allowed to connect from an address
auto flags = Poco::Net::DNS::DNS_HINT_AI_V4MAPPED | Poco::Net::DNS::DNS_HINT_AI_ALL;
/// Do not resolve IPv6 (or IPv4) if no local IPv6 (or IPv4) addresses are configured.
/// It should not affect client address checking, since client cannot connect from IPv6 address
/// if server has no IPv6 addresses.
flags |= Poco::Net::DNS::DNS_HINT_AI_ADDRCONFIG;
auto flags = Poco::Net::DNS::DNS_HINT_AI_ADDRCONFIG;
DNSResolver::IPAddresses addresses;

View File

@ -23,6 +23,7 @@
#include <Common/LockMemoryExceptionInThread.h>
#include <Common/ZooKeeper/ZooKeeperIO.h>
#include <Common/Stopwatch.h>
#include <Common/getMultipleKeysFromConfig.h>
namespace DB
{
@ -259,7 +260,7 @@ void KeeperServer::forceRecovery()
raft_instance->update_params(params);
}
void KeeperServer::launchRaftServer(bool enable_ipv6)
void KeeperServer::launchRaftServer(const Poco::Util::AbstractConfiguration & config, bool enable_ipv6)
{
nuraft::raft_params params;
params.heart_beat_interval_
@ -311,10 +312,26 @@ void KeeperServer::launchRaftServer(bool enable_ipv6)
nuraft::ptr<nuraft::logger> logger = nuraft::cs_new<LoggerWrapper>("RaftInstance", coordination_settings->raft_logs_level);
asio_service = nuraft::cs_new<nuraft::asio_service>(asio_opts, logger);
asio_listener = asio_service->create_rpc_listener(state_manager->getPort(), logger, enable_ipv6);
if (!asio_listener)
return;
// we use the same config as for the CH replicas because it is for internal communication between Keeper instances
std::vector<std::string> listen_hosts = DB::getMultipleValuesFromConfig(config, "", "interserver_listen_host");
if (listen_hosts.empty())
{
auto asio_listener = asio_service->create_rpc_listener(state_manager->getPort(), logger, enable_ipv6);
if (!asio_listener)
return;
asio_listeners.emplace_back(std::move(asio_listener));
}
else
{
for (const auto & listen_host : listen_hosts)
{
auto asio_listener = asio_service->create_rpc_listener(listen_host, state_manager->getPort(), logger);
if (asio_listener)
asio_listeners.emplace_back(std::move(asio_listener));
}
}
nuraft::ptr<nuraft::delayed_task_scheduler> scheduler = asio_service;
nuraft::ptr<nuraft::rpc_client_factory> rpc_cli_factory = asio_service;
@ -324,17 +341,21 @@ void KeeperServer::launchRaftServer(bool enable_ipv6)
/// raft_server creates unique_ptr from it
nuraft::context * ctx
= new nuraft::context(casted_state_manager, casted_state_machine, asio_listener, logger, rpc_cli_factory, scheduler, params);
= new nuraft::context(casted_state_manager, casted_state_machine, asio_listeners, logger, rpc_cli_factory, scheduler, params);
raft_instance = nuraft::cs_new<KeeperRaftServer>(ctx, init_options);
if (!raft_instance)
throw Exception(ErrorCodes::RAFT_ERROR, "Cannot allocate RAFT instance");
raft_instance->start_server(init_options.skip_initial_election_timeout_);
nuraft::ptr<nuraft::raft_server> casted_raft_server = raft_instance;
asio_listener->listen(casted_raft_server);
if (!raft_instance)
throw Exception(ErrorCodes::RAFT_ERROR, "Cannot allocate RAFT instance");
for (const auto & asio_listener : asio_listeners)
{
asio_listener->listen(casted_raft_server);
}
}
void KeeperServer::startup(const Poco::Util::AbstractConfiguration & config, bool enable_ipv6)
@ -364,7 +385,7 @@ void KeeperServer::startup(const Poco::Util::AbstractConfiguration & config, boo
last_local_config = state_manager->parseServersConfiguration(config, true).cluster_config;
launchRaftServer(enable_ipv6);
launchRaftServer(config, enable_ipv6);
keeper_context->server_state = KeeperContext::Phase::RUNNING;
}
@ -388,10 +409,13 @@ void KeeperServer::shutdownRaftServer()
raft_instance.reset();
if (asio_listener)
for (const auto & asio_listener : asio_listeners)
{
asio_listener->stop();
asio_listener->shutdown();
if (asio_listener)
{
asio_listener->stop();
asio_listener->shutdown();
}
}
if (asio_service)

View File

@ -30,7 +30,7 @@ private:
struct KeeperRaftServer;
nuraft::ptr<KeeperRaftServer> raft_instance;
nuraft::ptr<nuraft::asio_service> asio_service;
nuraft::ptr<nuraft::rpc_listener> asio_listener;
std::vector<nuraft::ptr<nuraft::rpc_listener>> asio_listeners;
// because some actions can be applied
// when we are sure that there are no requests currently being
// processed (e.g. recovery) we do all write actions
@ -52,7 +52,7 @@ private:
/// Almost copy-paste from nuraft::launcher, but with separated server init and start
/// Allows to avoid race conditions.
void launchRaftServer(bool enable_ipv6);
void launchRaftServer(const Poco::Util::AbstractConfiguration & config, bool enable_ipv6);
void shutdownRaftServer();

View File

@ -183,23 +183,28 @@ bool HadoopSnappyReadBuffer::nextImpl()
if (eof)
return false;
if (!in_available)
do
{
in->nextIfAtEnd();
in_available = in->buffer().end() - in->position();
in_data = in->position();
if (!in_available)
{
in->nextIfAtEnd();
in_available = in->buffer().end() - in->position();
in_data = in->position();
}
if (decoder->result == Status::NEEDS_MORE_INPUT && (!in_available || in->eof()))
{
throw Exception(String("hadoop snappy decode error:") + statusToString(decoder->result), ErrorCodes::SNAPPY_UNCOMPRESS_FAILED);
}
out_capacity = internal_buffer.size();
out_data = internal_buffer.begin();
decoder->result = decoder->readBlock(&in_available, &in_data, &out_capacity, &out_data);
in->position() = in->buffer().end() - in_available;
}
while (decoder->result == Status::NEEDS_MORE_INPUT);
if (decoder->result == Status::NEEDS_MORE_INPUT && (!in_available || in->eof()))
{
throw Exception(String("hadoop snappy decode error:") + statusToString(decoder->result), ErrorCodes::SNAPPY_UNCOMPRESS_FAILED);
}
out_capacity = internal_buffer.size();
out_data = internal_buffer.begin();
decoder->result = decoder->readBlock(&in_available, &in_data, &out_capacity, &out_data);
in->position() = in->buffer().end() - in_available;
working_buffer.resize(internal_buffer.size() - out_capacity);
if (decoder->result == Status::OK)

View File

@ -60,7 +60,8 @@ TEST(HadoopSnappyDecoder, repeatNeedMoreInput)
String output;
WriteBufferFromString out(output);
copyData(read_buffer, out);
out.finalize();
UInt128 hashcode = sipHash128(output.c_str(), output.size());
String hashcode_str = getHexUIntLowercase(hashcode);
ASSERT_EQ(hashcode_str, "593afe14f61866915cc00b8c7bd86046");
ASSERT_EQ(hashcode_str, "673e5b065186cec146789451c2a8f703");
}

View File

@ -1,4 +1,5 @@
#include <Parsers/ASTQueryWithOutput.h>
#include <Parsers/ASTSetQuery.h>
namespace DB
{
@ -40,7 +41,7 @@ void ASTQueryWithOutput::formatImpl(const FormatSettings & s, FormatState & stat
format->formatImpl(s, state, frame);
}
if (settings_ast)
if (settings_ast && assert_cast<ASTSetQuery *>(settings_ast.get())->print_in_format)
{
s.ostr << (s.hilite ? hilite_keyword : "") << s.nl_or_ws << indent_str << "SETTINGS " << (s.hilite ? hilite_none : "");
settings_ast->formatImpl(s, state, frame);

View File

@ -192,7 +192,7 @@ void ASTSelectQuery::formatImpl(const FormatSettings & s, FormatState & state, F
limitOffset()->formatImpl(s, state, frame);
}
if (settings())
if (settings() && assert_cast<ASTSetQuery *>(settings().get())->print_in_format)
{
s.ostr << (s.hilite ? hilite_keyword : "") << s.nl_or_ws << indent_str << "SETTINGS " << (s.hilite ? hilite_none : "");
settings()->formatImpl(s, state, frame);

View File

@ -14,6 +14,12 @@ class ASTSetQuery : public IAST
public:
bool is_standalone = true; /// If false, this AST is a part of another query, such as SELECT.
/// To support overriding certain settings in a **subquery**, we add a ASTSetQuery with Settings to all subqueries, containing
/// the list of all settings that affect them (specifically or globally to the whole query).
/// We use `print_in_format` to avoid printing these nodes when they were left unchanged from the parent copy
/// See more: https://github.com/ClickHouse/ClickHouse/issues/38895
bool print_in_format = true;
SettingsChanges changes;
NameToNameMap query_parameters;

View File

@ -142,7 +142,9 @@ bool ParserQueryWithOutput::parseImpl(Pos & pos, ASTPtr & node, Expected & expec
// Pass them manually, to apply in InterpreterSelectQuery::initSettings()
if (query->as<ASTSelectWithUnionQuery>())
{
QueryWithOutputSettingsPushDownVisitor::Data data{query_with_output.settings_ast};
auto settings = query_with_output.settings_ast->clone();
assert_cast<ASTSetQuery *>(settings.get())->print_in_format = false;
QueryWithOutputSettingsPushDownVisitor::Data data{settings};
QueryWithOutputSettingsPushDownVisitor(data).visit(query);
}
}

View File

@ -2,7 +2,7 @@
#include <Storages/RocksDB/StorageEmbeddedRocksDB.h>
#include <IO/WriteBufferFromString.h>
#include <rocksdb/db.h>
#include <rocksdb/utilities/db_ttl.h>
namespace DB

View File

@ -1,3 +1,4 @@
#include <Storages/checkAndGetLiteralArgument.h>
#include <Storages/RocksDB/StorageEmbeddedRocksDB.h>
#include <Storages/RocksDB/EmbeddedRocksDBSink.h>
@ -21,9 +22,9 @@
#include <Common/Exception.h>
#include <base/sort.h>
#include <rocksdb/db.h>
#include <rocksdb/table.h>
#include <rocksdb/convenience.h>
#include <rocksdb/utilities/db_ttl.h>
#include <cstddef>
#include <filesystem>
@ -164,10 +165,12 @@ StorageEmbeddedRocksDB::StorageEmbeddedRocksDB(const StorageID & table_id_,
const StorageInMemoryMetadata & metadata_,
bool attach,
ContextPtr context_,
const String & primary_key_)
const String & primary_key_,
Int32 ttl_)
: IStorage(table_id_)
, WithContext(context_->getGlobalContext())
, primary_key{primary_key_}
, ttl(ttl_)
{
setInMemoryMetadata(metadata_);
rocksdb_dir = context_->getPath() + relative_data_path_;
@ -193,7 +196,6 @@ void StorageEmbeddedRocksDB::initDB()
{
rocksdb::Status status;
rocksdb::Options base;
rocksdb::DB * db;
base.create_if_missing = true;
base.compression = rocksdb::CompressionType::kZSTD;
@ -264,15 +266,28 @@ void StorageEmbeddedRocksDB::initDB()
}
}
status = rocksdb::DB::Open(merged, rocksdb_dir, &db);
if (!status.ok())
if (ttl > 0)
{
throw Exception(ErrorCodes::ROCKSDB_ERROR, "Fail to open rocksdb path at: {}: {}",
rocksdb_dir, status.ToString());
rocksdb::DBWithTTL * db;
status = rocksdb::DBWithTTL::Open(merged, rocksdb_dir, &db, ttl);
if (!status.ok())
{
throw Exception(ErrorCodes::ROCKSDB_ERROR, "Failed to open rocksdb path at: {}: {}",
rocksdb_dir, status.ToString());
}
rocksdb_ptr = std::unique_ptr<rocksdb::DBWithTTL>(db);
}
else
{
rocksdb::DB * db;
status = rocksdb::DB::Open(merged, rocksdb_dir, &db);
if (!status.ok())
{
throw Exception(ErrorCodes::ROCKSDB_ERROR, "Failed to open rocksdb path at: {}: {}",
rocksdb_dir, status.ToString());
}
rocksdb_ptr = std::unique_ptr<rocksdb::DB>(db);
}
/// It's ok just to wrap db with unique_ptr, from rdb documentation: "when you are done with a database, just delete the database object"
rocksdb_ptr = std::unique_ptr<rocksdb::DB>(db);
}
Pipe StorageEmbeddedRocksDB::read(
@ -335,10 +350,16 @@ SinkToStoragePtr StorageEmbeddedRocksDB::write(
static StoragePtr create(const StorageFactory::Arguments & args)
{
// TODO custom RocksDBSettings, table function
if (!args.engine_args.empty())
throw Exception(
"Engine " + args.engine_name + " doesn't support any arguments (" + toString(args.engine_args.size()) + " given)",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
auto engine_args = args.engine_args;
if (engine_args.size() > 1)
{
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Engine {} requires at most 1 parameter. ({} given). Correct usage: EmbeddedRocksDB([ttl])",
args.engine_name, engine_args.size());
}
Int32 ttl{0};
if (!engine_args.empty())
ttl = checkAndGetLiteralArgument<UInt64>(engine_args[0], "ttl");
StorageInMemoryMetadata metadata;
metadata.setColumns(args.columns);
@ -353,7 +374,7 @@ static StoragePtr create(const StorageFactory::Arguments & args)
{
throw Exception("StorageEmbeddedRocksDB must require one column in primary key", ErrorCodes::BAD_ARGUMENTS);
}
return std::make_shared<StorageEmbeddedRocksDB>(args.table_id, args.relative_data_path, metadata, args.attach, args.getContext(), primary_key_names[0]);
return std::make_shared<StorageEmbeddedRocksDB>(args.table_id, args.relative_data_path, metadata, args.attach, args.getContext(), primary_key_names[0], ttl);
}
std::shared_ptr<rocksdb::Statistics> StorageEmbeddedRocksDB::getRocksDBStatistics() const
@ -449,6 +470,7 @@ void registerStorageEmbeddedRocksDB(StorageFactory & factory)
{
StorageFactory::StorageFeatures features{
.supports_sort_order = true,
.supports_ttl = true,
.supports_parallel_insert = true,
};

View File

@ -32,7 +32,8 @@ public:
const StorageInMemoryMetadata & metadata,
bool attach,
ContextPtr context_,
const String & primary_key_);
const String & primary_key_,
Int32 ttl_ = 0);
std::string getName() const override { return "EmbeddedRocksDB"; }
@ -80,6 +81,7 @@ private:
RocksDBPtr rocksdb_ptr;
mutable std::shared_mutex rocksdb_ptr_mx;
String rocksdb_dir;
Int32 ttl;
void initDB();
};

View File

@ -30,6 +30,18 @@ def test_valid_options(start_cluster):
DROP TABLE test;
"""
)
node.query(
"""
CREATE TABLE test (key UInt64, value String) Engine=EmbeddedRocksDB(0) PRIMARY KEY(key);
DROP TABLE test;
"""
)
node.query(
"""
CREATE TABLE test (key UInt64, value String) Engine=EmbeddedRocksDB(10) PRIMARY KEY(key);
DROP TABLE test;
"""
)
def test_invalid_options(start_cluster):

View File

@ -0,0 +1,17 @@
SELECT 1
FORMAT CSV
SETTINGS max_execution_time = 0.001
SELECT 1
SETTINGS max_execution_time = 0.001
FORMAT CSV
SELECT 1
UNION ALL
SELECT 2
FORMAT CSV
SETTINGS max_execution_time = 0.001
SELECT 1
SETTINGS max_threads = 1
UNION ALL
SELECT 2
SETTINGS max_execution_time = 2
FORMAT `Null`

View File

@ -0,0 +1,16 @@
#!/usr/bin/env bash
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
set -e
format="$CLICKHOUSE_FORMAT"
echo "select 1 format CSV settings max_execution_time = 0.001" | $format
echo "select 1 settings max_execution_time = 0.001 format CSV" | $format
echo "select 1 UNION ALL Select 2 format CSV settings max_execution_time = 0.001" | $format
# I don't think having multiple settings makes sense, but it's supported so test that it still works
echo "select 1 settings max_threads=1 UNION ALL select 2 settings max_execution_time=2 format Null" | $format