Merge branch 'master' into PF202201250933

This commit is contained in:
mergify[bot] 2022-01-25 18:56:50 +00:00 committed by GitHub
commit 00d728a9d2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
47 changed files with 883 additions and 164 deletions

2
contrib/lz4 vendored

@ -1 +1 @@
Subproject commit f39b79fb02962a1cd880bbdecb6dffba4f754a11
Subproject commit 4c9431e9af596af0556e5da0ae99305bafb2b10b

View File

@ -159,8 +159,7 @@ Configuration fields:
| Tag | Description | Required |
|------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|
| `name` | Column name. | Yes |
| `type` | ClickHouse data type: [UInt8](../../../sql-reference/data-types/int-uint.md), [UInt16](../../../sql-reference/data-types/int-uint.md), [UInt32](../../../sql-reference/data-types/int-uint.md), [UInt64](../../../sql-reference/data-types/int-uint.md), [Int8](../../../sql-reference/data-types/int-uint.md), [Int16](../../../sql-reference/data-types/int-uint.md), [Int32](../../../sql-reference/data-types/int-uint.md), [Int64](../../../sql-reference/data-types/int-uint.md), [Float32](../../../sql-reference/data-types/float.md), [Float64](../../../sql-reference/data-types/float.md), [UUID](../../../sql-reference/data-types/uuid.md), [Decimal32](../../../sql-reference/data-types/decimal.md), [Decimal64](../../../sql-reference/data-types/decimal.md), [Decimal128](../../../sql-reference/data-types/decimal.md), [Decimal256](../../../sql-reference/data-types/decimal.md),
[Date](../../../sql-reference/data-types/date.md), [Date32](../../../sql-reference/data-types/date32.md), [DateTime](../../../sql-reference/data-types/datetime.md), [DateTime64](../../../sql-reference/data-types/datetime64.md), [String](../../../sql-reference/data-types/string.md), [Array](../../../sql-reference/data-types/array.md).<br/>ClickHouse tries to cast value from dictionary to the specified data type. For example, for MySQL, the field might be `TEXT`, `VARCHAR`, or `BLOB` in the MySQL source table, but it can be uploaded as `String` in ClickHouse.<br/>[Nullable](../../../sql-reference/data-types/nullable.md) is currently supported for [Flat](external-dicts-dict-layout.md#flat), [Hashed](external-dicts-dict-layout.md#dicts-external_dicts_dict_layout-hashed), [ComplexKeyHashed](external-dicts-dict-layout.md#complex-key-hashed), [Direct](external-dicts-dict-layout.md#direct), [ComplexKeyDirect](external-dicts-dict-layout.md#complex-key-direct), [RangeHashed](external-dicts-dict-layout.md#range-hashed), [Polygon](external-dicts-dict-polygon.md), [Cache](external-dicts-dict-layout.md#cache), [ComplexKeyCache](external-dicts-dict-layout.md#complex-key-cache), [SSDCache](external-dicts-dict-layout.md#ssd-cache), [SSDComplexKeyCache](external-dicts-dict-layout.md#complex-key-ssd-cache) dictionaries. In [IPTrie](external-dicts-dict-layout.md#ip-trie) dictionaries `Nullable` types are not supported. | Yes |
| `type` | ClickHouse data type: [UInt8](../../../sql-reference/data-types/int-uint.md), [UInt16](../../../sql-reference/data-types/int-uint.md), [UInt32](../../../sql-reference/data-types/int-uint.md), [UInt64](../../../sql-reference/data-types/int-uint.md), [Int8](../../../sql-reference/data-types/int-uint.md), [Int16](../../../sql-reference/data-types/int-uint.md), [Int32](../../../sql-reference/data-types/int-uint.md), [Int64](../../../sql-reference/data-types/int-uint.md), [Float32](../../../sql-reference/data-types/float.md), [Float64](../../../sql-reference/data-types/float.md), [UUID](../../../sql-reference/data-types/uuid.md), [Decimal32](../../../sql-reference/data-types/decimal.md), [Decimal64](../../../sql-reference/data-types/decimal.md), [Decimal128](../../../sql-reference/data-types/decimal.md), [Decimal256](../../../sql-reference/data-types/decimal.md),[Date](../../../sql-reference/data-types/date.md), [Date32](../../../sql-reference/data-types/date32.md), [DateTime](../../../sql-reference/data-types/datetime.md), [DateTime64](../../../sql-reference/data-types/datetime64.md), [String](../../../sql-reference/data-types/string.md), [Array](../../../sql-reference/data-types/array.md).<br/>ClickHouse tries to cast value from dictionary to the specified data type. For example, for MySQL, the field might be `TEXT`, `VARCHAR`, or `BLOB` in the MySQL source table, but it can be uploaded as `String` in ClickHouse.<br/>[Nullable](../../../sql-reference/data-types/nullable.md) is currently supported for [Flat](external-dicts-dict-layout.md#flat), [Hashed](external-dicts-dict-layout.md#dicts-external_dicts_dict_layout-hashed), [ComplexKeyHashed](external-dicts-dict-layout.md#complex-key-hashed), [Direct](external-dicts-dict-layout.md#direct), [ComplexKeyDirect](external-dicts-dict-layout.md#complex-key-direct), [RangeHashed](external-dicts-dict-layout.md#range-hashed), [Polygon](external-dicts-dict-polygon.md), [Cache](external-dicts-dict-layout.md#cache), [ComplexKeyCache](external-dicts-dict-layout.md#complex-key-cache), [SSDCache](external-dicts-dict-layout.md#ssd-cache), [SSDComplexKeyCache](external-dicts-dict-layout.md#complex-key-ssd-cache) dictionaries. In [IPTrie](external-dicts-dict-layout.md#ip-trie) dictionaries `Nullable` types are not supported. | Yes |
| `null_value` | Default value for a non-existing element.<br/>In the example, it is an empty string. [NULL](../../syntax.md#null-literal) value can be used only for the `Nullable` types (see the previous line with types description). | Yes |
| `expression` | [Expression](../../../sql-reference/syntax.md#syntax-expressions) that ClickHouse executes on the value.<br/>The expression can be a column name in the remote SQL database. Thus, you can use it to create an alias for the remote column.<br/><br/>Default value: no expression. | No |
| <a name="hierarchical-dict-attr"></a> `hierarchical` | If `true`, the attribute contains the value of a parent key for the current key. See [Hierarchical Dictionaries](../../../sql-reference/dictionaries/external-dictionaries/external-dicts-dict-hierarchical.md).<br/><br/>Default value: `false`. | No |

View File

@ -252,7 +252,6 @@ CREATE TABLE codec_example
ENGINE = MergeTree()
```
### Encryption Codecs {#create-query-encryption-codecs}
These codecs don't actually compress data, but instead encrypt data on disk. These are only available when an encryption key is specified by [encryption](../../../operations/server-configuration-parameters/settings.md#server-settings-encryption) settings. Note that encryption only makes sense at the end of codec pipelines, because encrypted data usually can't be compressed in any meaningful way.
@ -260,6 +259,7 @@ These codecs don't actually compress data, but instead encrypt data on disk. The
Encryption codecs:
- `CODEC('AES-128-GCM-SIV')` — Encrypts data with AES-128 in [RFC 8452](https://tools.ietf.org/html/rfc8452) GCM-SIV mode.
- `CODEC('AES-256-GCM-SIV')` — Encrypts data with AES-256 in GCM-SIV mode.
These codecs use a fixed nonce and encryption is therefore deterministic. This makes it compatible with deduplicating engines such as [ReplicatedMergeTree](../../../engines/table-engines/mergetree-family/replication.md) but has a weakness: when the same data block is encrypted twice, the resulting ciphertext will be exactly the same so an adversary who can read the disk can see this equivalence (although only the equivalence, without getting its content).
@ -269,7 +269,7 @@ These codecs use a fixed nonce and encryption is therefore deterministic. This m
!!! attention "Attention"
If you perform a SELECT query mentioning a specific value in an encrypted column (such as in its WHERE clause), the value may appear in [system.query_log](../../../operations/system-tables/query_log.md). You may want to disable the logging.
**Example**
```sql

View File

@ -105,7 +105,7 @@ ClickHouse проверяет условия для `min_part_size` и `min_part
```xml
<encryption_codecs>
<aes_128_gcm_siv>
<key_hex from_env="KEY"></key_hex>
<key_hex from_env="ENVVAR"></key_hex>
</aes_128_gcm_siv>
</encryption_codecs>
```
@ -118,7 +118,7 @@ ClickHouse проверяет условия для `min_part_size` и `min_part
<encryption_codecs>
<aes_128_gcm_siv>
<key_hex id="0">00112233445566778899aabbccddeeff</key_hex>
<key_hex id="1" from_env=".."></key_hex>
<key_hex id="1" from_env="ENVVAR"></key_hex>
<current_key_id>1</current_key_id>
</aes_128_gcm_siv>
</encryption_codecs>

View File

@ -246,6 +246,46 @@ CREATE TABLE codec_example
ENGINE = MergeTree()
```
### Кодеки шифрования {#create-query-encryption-codecs}
Эти кодеки не сжимают данные, вместо этого они зашифровывают данные на диске. Воспользоваться кодеками можно, только когда ключ шифрования задан параметрами [шифрования](../../../operations/server-configuration-parameters/settings.md#server-settings-encryption). Обратите внимание: ставить кодеки шифрования имеет смысл в самый конец цепочки кодеков, потому что зашифрованные данные, как правило, нельзя сжать релевантным образом.
Кодеки шифрования:
- `CODEC('AES-128-GCM-SIV')` — Зашифровывает данные с помощью AES-128 в режиме [RFC 8452](https://tools.ietf.org/html/rfc8452) GCM-SIV.
- `CODEC('AES-256-GCM-SIV')` — Зашифровывает данные с помощью AES-256 в режиме GCM-SIV.
Эти кодеки используют фиксированный одноразовый ключ шифрования. Таким образом, это детерминированное шифрование. Оно совместимо с поддерживающими дедупликацию движками, в частности, [ReplicatedMergeTree](../../../engines/table-engines/mergetree-family/replication.md). Однако у шифрования имеется недостаток: если дважды зашифровать один и тот же блок данных, текст на выходе получится одинаковым, и злоумышленник, у которого есть доступ к диску, заметит эту эквивалентность (при этом доступа к содержимому он не получит).
!!! attention "Внимание"
Большинство движков, включая семейство `MergeTree`, создают на диске индексные файлы, не применяя кодеки. А значит, в том случае, если зашифрованный столбец индексирован, на диске отобразится незашифрованный текст.
!!! attention "Внимание"
Если вы выполняете запрос SELECT с упоминанием конкретного значения в зашифрованном столбце (например, при использовании секции WHERE), это значение может появиться в [system.query_log](../../../operations/system-tables/query_log.md). Рекомендуем отключить логирование.
**Пример**
```sql
CREATE TABLE mytable
(
x String Codec(AES_128_GCM_SIV)
)
ENGINE = MergeTree ORDER BY x;
```
!!!note "Замечание"
Если необходимо применить сжатие, это нужно явно прописать в запросе. Без этого будет выполнено только шифрование данных.
**Пример**
```sql
CREATE TABLE mytable
(
x String Codec(Delta, LZ4, AES_128_GCM_SIV)
)
ENGINE = MergeTree ORDER BY x;
```
## Временные таблицы {#temporary-tables}
ClickHouse поддерживает временные таблицы со следующими характеристиками:

View File

@ -145,14 +145,14 @@ enum class AccessType
M(SYSTEM_RELOAD_EMBEDDED_DICTIONARIES, "RELOAD EMBEDDED DICTIONARIES", GLOBAL, SYSTEM_RELOAD) /* implicitly enabled by the grant SYSTEM_RELOAD_DICTIONARY ON *.* */\
M(SYSTEM_RELOAD, "", GROUP, SYSTEM) \
M(SYSTEM_RESTART_DISK, "SYSTEM RESTART DISK", GLOBAL, SYSTEM) \
M(SYSTEM_MERGES, "SYSTEM STOP MERGES, SYSTEM START MERGES, STOP_MERGES, START MERGES", TABLE, SYSTEM) \
M(SYSTEM_MERGES, "SYSTEM STOP MERGES, SYSTEM START MERGES, STOP MERGES, START MERGES", TABLE, SYSTEM) \
M(SYSTEM_TTL_MERGES, "SYSTEM STOP TTL MERGES, SYSTEM START TTL MERGES, STOP TTL MERGES, START TTL MERGES", TABLE, SYSTEM) \
M(SYSTEM_FETCHES, "SYSTEM STOP FETCHES, SYSTEM START FETCHES, STOP FETCHES, START FETCHES", TABLE, SYSTEM) \
M(SYSTEM_MOVES, "SYSTEM STOP MOVES, SYSTEM START MOVES, STOP MOVES, START MOVES", TABLE, SYSTEM) \
M(SYSTEM_DISTRIBUTED_SENDS, "SYSTEM STOP DISTRIBUTED SENDS, SYSTEM START DISTRIBUTED SENDS, STOP DISTRIBUTED SENDS, START DISTRIBUTED SENDS", TABLE, SYSTEM_SENDS) \
M(SYSTEM_REPLICATED_SENDS, "SYSTEM STOP REPLICATED SENDS, SYSTEM START REPLICATED SENDS, STOP_REPLICATED_SENDS, START REPLICATED SENDS", TABLE, SYSTEM_SENDS) \
M(SYSTEM_REPLICATED_SENDS, "SYSTEM STOP REPLICATED SENDS, SYSTEM START REPLICATED SENDS, STOP REPLICATED SENDS, START REPLICATED SENDS", TABLE, SYSTEM_SENDS) \
M(SYSTEM_SENDS, "SYSTEM STOP SENDS, SYSTEM START SENDS, STOP SENDS, START SENDS", GROUP, SYSTEM) \
M(SYSTEM_REPLICATION_QUEUES, "SYSTEM STOP REPLICATION QUEUES, SYSTEM START REPLICATION QUEUES, STOP_REPLICATION_QUEUES, START REPLICATION QUEUES", TABLE, SYSTEM) \
M(SYSTEM_REPLICATION_QUEUES, "SYSTEM STOP REPLICATION QUEUES, SYSTEM START REPLICATION QUEUES, STOP REPLICATION QUEUES, START REPLICATION QUEUES", TABLE, SYSTEM) \
M(SYSTEM_DROP_REPLICA, "DROP REPLICA", TABLE, SYSTEM) \
M(SYSTEM_SYNC_REPLICA, "SYNC REPLICA", TABLE, SYSTEM) \
M(SYSTEM_RESTART_REPLICA, "RESTART REPLICA", TABLE, SYSTEM) \

View File

@ -609,6 +609,7 @@
M(638, SNAPPY_UNCOMPRESS_FAILED) \
M(639, SNAPPY_COMPRESS_FAILED) \
M(640, NO_HIVEMETASTORE) \
M(641, CANNOT_APPEND_TO_FILE) \
\
M(999, KEEPER_EXCEPTION) \
M(1000, POCO_EXCEPTION) \

View File

@ -75,7 +75,11 @@ class IColumn;
M(UInt64, s3_max_single_read_retries, 4, "The maximum number of retries during single S3 read.", 0) \
M(UInt64, s3_max_redirects, 10, "Max number of S3 redirects hops allowed.", 0) \
M(UInt64, s3_max_connections, 1024, "The maximum number of connections per server.", 0) \
M(Bool, s3_truncate_on_insert, false, "Enables or disables truncate before insert in s3 engine tables.", 0) \
M(Bool, s3_create_new_file_on_insert, false, "Enables or disables creating a new file on each insert in s3 engine tables", 0) \
M(UInt64, hdfs_replication, 0, "The actual number of replications can be specified when the hdfs file is created.", 0) \
M(Bool, hdfs_truncate_on_insert, false, "Enables or disables truncate before insert in s3 engine tables", 0) \
M(Bool, hdfs_create_new_file_on_insert, false, "Enables or disables creating a new file on each insert in hdfs engine tables", 0) \
M(UInt64, hsts_max_age, 0, "Expired time for hsts. 0 means disable HSTS.", 0) \
M(Bool, extremes, false, "Calculate minimums and maximums of the result columns. They can be output in JSON-formats.", IMPORTANT) \
M(Bool, use_uncompressed_cache, false, "Whether to use the cache of uncompressed blocks.", 0) \
@ -490,6 +494,7 @@ class IColumn;
\
M(Bool, engine_file_empty_if_not_exists, false, "Allows to select data from a file engine table without file", 0) \
M(Bool, engine_file_truncate_on_insert, false, "Enables or disables truncate before insert in file engine tables", 0) \
M(Bool, engine_file_allow_create_multiple_files, false, "Enables or disables creating a new file on each insert in file engine tables if format has suffix.", 0) \
M(Bool, allow_experimental_database_replicated, false, "Allow to create databases with Replicated engine", 0) \
M(UInt64, database_replicated_initial_query_timeout_sec, 300, "How long initial DDL query should wait for Replicated database to precess previous DDL queue entries", 0) \
M(UInt64, max_distributed_depth, 5, "Maximum distributed query depth", 0) \

View File

@ -8,7 +8,6 @@
#include <Common/assert_cast.h>
#include <Formats/FormatSettings.h>
#include <Formats/ProtobufReader.h>
#include <Formats/ProtobufWriter.h>
#include <Core/Field.h>
namespace DB

View File

@ -394,6 +394,27 @@ void FormatFactory::registerNonTrivialPrefixAndSuffixChecker(const String & name
target = std::move(non_trivial_prefix_and_suffix_checker);
}
void FormatFactory::registerAppendSupportChecker(const String & name, AppendSupportChecker append_support_checker)
{
auto & target = dict[name].append_support_checker;
if (target)
throw Exception("FormatFactory: Suffix checker " + name + " is already registered", ErrorCodes::LOGICAL_ERROR);
target = std::move(append_support_checker);
}
void FormatFactory::markFormatHasNoAppendSupport(const String & name)
{
registerAppendSupportChecker(name, [](const FormatSettings &){ return false; });
}
bool FormatFactory::checkIfFormatSupportAppend(const String & name, ContextPtr context, const std::optional<FormatSettings> & format_settings_)
{
auto format_settings = format_settings_ ? *format_settings_ : getFormatSettings(context);
auto & append_support_checker = dict[name].append_support_checker;
/// By default we consider that format supports append
return !append_support_checker || append_support_checker(format_settings);
}
void FormatFactory::registerOutputFormat(const String & name, OutputCreator output_creator)
{
auto & target = dict[name].output_creator;

View File

@ -93,6 +93,10 @@ private:
/// The checker should return true if parallel parsing should be disabled.
using NonTrivialPrefixAndSuffixChecker = std::function<bool(ReadBuffer & buf)>;
/// Some formats can support append depending on settings.
/// The checker should return true if format support append.
using AppendSupportChecker = std::function<bool(const FormatSettings & settings)>;
using SchemaReaderCreator = std::function<SchemaReaderPtr(ReadBuffer & in, const FormatSettings & settings, ContextPtr context)>;
using ExternalSchemaReaderCreator = std::function<ExternalSchemaReaderPtr(const FormatSettings & settings)>;
@ -106,6 +110,7 @@ private:
bool supports_parallel_formatting{false};
bool is_column_oriented{false};
NonTrivialPrefixAndSuffixChecker non_trivial_prefix_and_suffix_checker;
AppendSupportChecker append_support_checker;
};
using FormatsDictionary = std::unordered_map<String, Creators>;
@ -167,6 +172,14 @@ public:
void registerNonTrivialPrefixAndSuffixChecker(const String & name, NonTrivialPrefixAndSuffixChecker non_trivial_prefix_and_suffix_checker);
void registerAppendSupportChecker(const String & name, AppendSupportChecker append_support_checker);
/// If format always doesn't support append, you can use this method instead of
/// registerAppendSupportChecker with append_support_checker that always returns true.
void markFormatHasNoAppendSupport(const String & name);
bool checkIfFormatSupportAppend(const String & name, ContextPtr context, const std::optional<FormatSettings> & format_settings_ = std::nullopt);
/// Register format by its name.
void registerInputFormat(const String & name, InputCreator input_creator);
void registerOutputFormat(const String & name, OutputCreator output_creator);

View File

@ -26,7 +26,7 @@ Lz4DeflatingWriteBuffer::Lz4DeflatingWriteBuffer(
0 /* no dictID */,
LZ4F_noBlockChecksum},
compression_level, /* compression level; 0 == default */
0, /* autoflush */
1, /* autoflush */
0, /* favor decompression speed */
{0, 0, 0}, /* reserved, must be set to 0 */
};
@ -125,6 +125,8 @@ void Lz4DeflatingWriteBuffer::nextImpl()
out->position() = out->buffer().begin();
throw;
}
out->next();
out_capacity = out->buffer().end() - out->position();
}
void Lz4DeflatingWriteBuffer::finalizeBefore()

View File

@ -70,6 +70,12 @@ bool Lz4InflatingReadBuffer::nextImpl()
return !working_buffer.empty();
}
/// It may happen that we didn't get new uncompressed data
/// (for example if we read the end of frame). Load new data
/// in this case.
if (working_buffer.empty())
return nextImpl();
return true;
}
}

View File

@ -63,7 +63,10 @@ public:
if (!res)
working_buffer = Buffer(pos, pos);
else
{
pos = working_buffer.begin() + nextimpl_working_buffer_offset;
assert(position() != working_buffer.end());
}
nextimpl_working_buffer_offset = 0;
assert(position() <= working_buffer.end());

View File

@ -4,7 +4,6 @@
#include <Common/StringUtils/StringUtils.h>
#include <Common/memcpySmall.h>
#include <Formats/FormatSettings.h>
#include <IO/WriteHelpers.h>
#include <IO/WriteBufferFromString.h>
#include <IO/BufferWithOwnMemory.h>
#include <IO/readFloatText.h>

View File

@ -141,12 +141,17 @@ void InterpreterSystemQuery::startStopAction(StorageActionBlockType action_type,
auto manager = getContext()->getActionLocksManager();
manager->cleanExpired();
auto access = getContext()->getAccess();
auto required_access_type = getRequiredAccessType(action_type);
if (volume_ptr && action_type == ActionLocks::PartsMerge)
{
access->checkAccess(required_access_type);
volume_ptr->setAvoidMergesUserOverride(!start);
}
else if (table_id)
{
access->checkAccess(required_access_type, table_id.database_name, table_id.table_name);
auto table = DatabaseCatalog::instance().tryGetTable(table_id, getContext());
if (table)
{
@ -161,7 +166,6 @@ void InterpreterSystemQuery::startStopAction(StorageActionBlockType action_type,
}
else
{
auto access = getContext()->getAccess();
for (auto & elem : DatabaseCatalog::instance().getDatabases())
{
for (auto iterator = elem.second->getTablesIterator(getContext()); iterator->isValid(); iterator->next())
@ -170,14 +174,9 @@ void InterpreterSystemQuery::startStopAction(StorageActionBlockType action_type,
if (!table)
continue;
if (!access->isGranted(getRequiredAccessType(action_type), elem.first, iterator->name()))
if (!access->isGranted(required_access_type, elem.first, iterator->name()))
{
LOG_INFO(
log,
"Access {} denied, skipping {}.{}",
toString(getRequiredAccessType(action_type)),
elem.first,
iterator->name());
LOG_INFO(log, "Access {} denied, skipping {}.{}", toString(required_access_type), elem.first, iterator->name());
continue;
}
@ -422,8 +421,7 @@ BlockIO InterpreterSystemQuery::execute()
restartReplicas(system_context);
break;
case Type::RESTART_REPLICA:
if (!tryRestartReplica(table_id, system_context))
throw Exception(ErrorCodes::BAD_ARGUMENTS, table_is_not_replicated.data(), table_id.getNameForLogs());
restartReplica(table_id, system_context);
break;
case Type::RESTORE_REPLICA:
restoreReplica();
@ -483,8 +481,6 @@ void InterpreterSystemQuery::restoreReplica()
StoragePtr InterpreterSystemQuery::tryRestartReplica(const StorageID & replica, ContextMutablePtr system_context, bool need_ddl_guard)
{
getContext()->checkAccess(AccessType::SYSTEM_RESTART_REPLICA, replica);
auto table_ddl_guard = need_ddl_guard
? DatabaseCatalog::instance().getDDLGuard(replica.getDatabaseName(), replica.getTableName())
: nullptr;
@ -529,15 +525,36 @@ StoragePtr InterpreterSystemQuery::tryRestartReplica(const StorageID & replica,
return table;
}
void InterpreterSystemQuery::restartReplica(const StorageID & replica, ContextMutablePtr system_context)
{
getContext()->checkAccess(AccessType::SYSTEM_RESTART_REPLICA, replica);
if (!tryRestartReplica(replica, system_context))
throw Exception(ErrorCodes::BAD_ARGUMENTS, table_is_not_replicated.data(), replica.getNameForLogs());
}
void InterpreterSystemQuery::restartReplicas(ContextMutablePtr system_context)
{
std::vector<StorageID> replica_names;
auto & catalog = DatabaseCatalog::instance();
auto access = getContext()->getAccess();
bool access_is_granted_globally = access->isGranted(AccessType::SYSTEM_RESTART_REPLICA);
for (auto & elem : catalog.getDatabases())
{
for (auto it = elem.second->getTablesIterator(getContext()); it->isValid(); it->next())
{
if (dynamic_cast<const StorageReplicatedMergeTree *>(it->table().get()))
{
if (!access_is_granted_globally && !access->isGranted(AccessType::SYSTEM_RESTART_REPLICA, elem.first, it->name()))
{
LOG_INFO(log, "Access {} denied, skipping {}.{}", "SYSTEM RESTART REPLICA", elem.first, it->name());
continue;
}
replica_names.emplace_back(it->databaseName(), it->name());
}
}
}
if (replica_names.empty())
return;
@ -583,14 +600,22 @@ void InterpreterSystemQuery::dropReplica(ASTSystemQuery & query)
}
else if (query.is_drop_whole_replica)
{
getContext()->checkAccess(AccessType::SYSTEM_DROP_REPLICA);
auto databases = DatabaseCatalog::instance().getDatabases();
auto access = getContext()->getAccess();
bool access_is_granted_globally = access->isGranted(AccessType::SYSTEM_DROP_REPLICA);
for (auto & elem : databases)
{
DatabasePtr & database = elem.second;
for (auto iterator = database->getTablesIterator(getContext()); iterator->isValid(); iterator->next())
{
if (!access_is_granted_globally && !access->isGranted(AccessType::SYSTEM_DROP_REPLICA, elem.first, iterator->name()))
{
LOG_INFO(log, "Access {} denied, skipping {}.{}", "SYSTEM DROP REPLICA", elem.first, iterator->name());
continue;
}
dropReplicaImpl(query, iterator->table());
}
LOG_TRACE(log, "Dropped replica {} from database {}", query.replica, backQuoteIfNeed(database->getDatabaseName()));
}
}

View File

@ -47,6 +47,7 @@ private:
/// Returns pointer to a newly created table if the restart was successful
StoragePtr tryRestartReplica(const StorageID & replica, ContextMutablePtr context, bool need_ddl_guard = true);
void restartReplica(const StorageID & replica, ContextMutablePtr system_context);
void restartReplicas(ContextMutablePtr system_context);
void syncReplica(ASTSystemQuery & query);

View File

@ -93,6 +93,7 @@ void registerOutputFormatArrow(FormatFactory & factory)
{
return std::make_shared<ArrowBlockOutputFormat>(buf, sample, false, format_settings);
});
factory.markFormatHasNoAppendSupport("Arrow");
factory.registerOutputFormat(
"ArrowStream",
@ -103,6 +104,7 @@ void registerOutputFormatArrow(FormatFactory & factory)
{
return std::make_shared<ArrowBlockOutputFormat>(buf, sample, true, format_settings);
});
factory.markFormatHasNoAppendSupport("ArrowStream");
}
}

View File

@ -479,6 +479,7 @@ void registerOutputFormatAvro(FormatFactory & factory)
{
return std::make_shared<AvroRowOutputFormat>(buf, sample, params, settings);
});
factory.markFormatHasNoAppendSupport("Avro");
}
}

View File

@ -91,6 +91,11 @@ void registerOutputFormatCustomSeparated(FormatFactory & factory)
});
factory.markOutputFormatSupportsParallelFormatting(format_name);
factory.registerAppendSupportChecker(format_name, [](const FormatSettings & settings)
{
return settings.custom.result_after_delimiter.empty();
});
};
registerWithNamesAndTypes("CustomSeparated", register_func);

View File

@ -284,6 +284,7 @@ void registerOutputFormatJSON(FormatFactory & factory)
});
factory.markOutputFormatSupportsParallelFormatting("JSON");
factory.markFormatHasNoAppendSupport("JSON");
factory.registerOutputFormat("JSONStrings", [](
WriteBuffer & buf,
@ -295,6 +296,7 @@ void registerOutputFormatJSON(FormatFactory & factory)
});
factory.markOutputFormatSupportsParallelFormatting("JSONStrings");
factory.markFormatHasNoAppendSupport("JSONStrings");
}
}

View File

@ -526,6 +526,7 @@ void registerOutputFormatORC(FormatFactory & factory)
{
return std::make_shared<ORCBlockOutputFormat>(buf, sample, format_settings);
});
factory.markFormatHasNoAppendSupport("ORC");
}
}

View File

@ -85,6 +85,7 @@ void registerOutputFormatParquet(FormatFactory & factory)
{
return std::make_shared<ParquetBlockOutputFormat>(buf, sample, format_settings);
});
factory.markFormatHasNoAppendSupport("Parquet");
}
}

View File

@ -235,5 +235,19 @@ void registerOutputFormatTemplate(FormatFactory & factory)
return std::make_shared<TemplateBlockOutputFormat>(sample, buf, settings, resultset_format, row_format, settings.template_settings.row_between_delimiter);
});
factory.registerAppendSupportChecker("Template", [](const FormatSettings & settings)
{
if (settings.template_settings.resultset_format.empty())
return true;
auto resultset_format = ParsedTemplateFormatString(
FormatSchemaInfo(settings.template_settings.resultset_format, "Template", false,
settings.schema.is_server, settings.schema.format_schema_path),
[&](const String & partName)
{
return static_cast<size_t>(TemplateBlockOutputFormat::stringToResultsetPart(partName));
});
return resultset_format.delimiters.empty() || resultset_format.delimiters.back().empty();
});
}
}

View File

@ -256,6 +256,7 @@ void registerOutputFormatXML(FormatFactory & factory)
});
factory.markOutputFormatSupportsParallelFormatting("XML");
factory.markFormatHasNoAppendSupport("XML");
}
}

View File

@ -14,9 +14,8 @@
#include <Processors/Formats/IInputFormat.h>
#include <Processors/Transforms/AddingDefaultsTransform.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <Interpreters/Context.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/TreeRewriter.h>
@ -28,7 +27,6 @@
#include <Storages/HDFS/WriteBufferFromHDFS.h>
#include <Storages/PartitionedSink.h>
#include <Formats/ReadSchemaUtils.h>
#include <Formats/FormatFactory.h>
#include <Functions/FunctionsConversion.h>
@ -52,7 +50,9 @@ namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int ACCESS_DENIED;
extern const int DATABASE_ACCESS_DENIED;
extern const int CANNOT_EXTRACT_TABLE_STRUCTURE;
extern const int BAD_ARGUMENTS;
extern const int LOGICAL_ERROR;
}
namespace
@ -139,20 +139,23 @@ StorageHDFS::StorageHDFS(
ASTPtr partition_by_)
: IStorage(table_id_)
, WithContext(context_)
, uri(uri_)
, uris({uri_})
, format_name(format_name_)
, compression_method(compression_method_)
, distributed_processing(distributed_processing_)
, partition_by(partition_by_)
{
context_->getRemoteHostFilter().checkURL(Poco::URI(uri));
checkHDFSURL(uri);
context_->getRemoteHostFilter().checkURL(Poco::URI(uri_));
checkHDFSURL(uri_);
String path = uri_.substr(uri_.find('/', uri_.find("//") + 2));
is_path_with_globs = path.find_first_of("*?{") != std::string::npos;
StorageInMemoryMetadata storage_metadata;
if (columns_.empty())
{
auto columns = getTableStructureFromData(format_name, uri, compression_method, context_);
auto columns = getTableStructureFromData(format_name, uri_, compression_method, context_);
storage_metadata.setColumns(columns);
}
else
@ -217,6 +220,39 @@ private:
Strings::iterator uris_iter;
};
class HDFSSource::URISIterator::Impl
{
public:
explicit Impl(const std::vector<const String> & uris_, ContextPtr context)
{
auto path_and_uri = getPathFromUriAndUriWithoutPath(uris_[0]);
HDFSBuilderWrapper builder = createHDFSBuilder(path_and_uri.second + "/", context->getGlobalContext()->getConfigRef());
HDFSFSPtr fs = createHDFSFS(builder.get());
for (const auto & uri : uris_)
{
path_and_uri = getPathFromUriAndUriWithoutPath(uri);
if (!hdfsExists(fs.get(), path_and_uri.first.c_str()))
uris.push_back(uri);
}
uris_iter = uris.begin();
}
String next()
{
std::lock_guard lock(mutex);
if (uris_iter == uris.end())
return "";
auto key = *uris_iter;
++uris_iter;
return key;
}
private:
std::mutex mutex;
Strings uris;
Strings::iterator uris_iter;
};
Block HDFSSource::getHeader(const StorageMetadataPtr & metadata_snapshot, bool need_path_column, bool need_file_column)
{
auto header = metadata_snapshot->getSampleBlock();
@ -250,6 +286,15 @@ String HDFSSource::DisclosedGlobIterator::next()
return pimpl->next();
}
HDFSSource::URISIterator::URISIterator(const std::vector<const String> & uris_, ContextPtr context)
: pimpl(std::make_shared<HDFSSource::URISIterator::Impl>(uris_, context))
{
}
String HDFSSource::URISIterator::next()
{
return pimpl->next();
}
HDFSSource::HDFSSource(
StorageHDFSPtr storage_,
@ -284,9 +329,8 @@ bool HDFSSource::initialize()
current_path = (*file_iterator)();
if (current_path.empty())
return false;
const size_t begin_of_path = current_path.find('/', current_path.find("//") + 2);
const String path_from_uri = current_path.substr(begin_of_path);
const String uri_without_path = current_path.substr(0, begin_of_path);
const auto [path_from_uri, uri_without_path] = getPathFromUriAndUriWithoutPath(current_path);
auto compression = chooseCompressionMethod(path_from_uri, storage->compression_method);
read_buf = wrapReadBufferWithCompressionMethod(std::make_unique<ReadBufferFromHDFS>(uri_without_path, path_from_uri, getContext()->getGlobalContext()->getConfigRef()), compression);
@ -469,15 +513,23 @@ Pipe StorageHDFS::read(
return callback();
});
}
else
else if (is_path_with_globs)
{
/// Iterate through disclosed globs and make a source for each file
auto glob_iterator = std::make_shared<HDFSSource::DisclosedGlobIterator>(context_, uri);
auto glob_iterator = std::make_shared<HDFSSource::DisclosedGlobIterator>(context_, uris[0]);
iterator_wrapper = std::make_shared<HDFSSource::IteratorWrapper>([glob_iterator]()
{
return glob_iterator->next();
});
}
else
{
auto uris_iterator = std::make_shared<HDFSSource::URISIterator>(uris, context_);
iterator_wrapper = std::make_shared<HDFSSource::IteratorWrapper>([uris_iterator]()
{
return uris_iterator->next();
});
}
Pipes pipes;
auto this_ptr = std::static_pointer_cast<StorageHDFS>(shared_from_this());
@ -505,9 +557,11 @@ Pipe StorageHDFS::read(
return Pipe::unitePipes(std::move(pipes));
}
SinkToStoragePtr StorageHDFS::write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr /*context*/)
SinkToStoragePtr StorageHDFS::write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr context_)
{
bool has_wildcards = uri.find(PartitionedSink::PARTITION_ID_WILDCARD) != String::npos;
String current_uri = uris.back();
bool has_wildcards = current_uri.find(PartitionedSink::PARTITION_ID_WILDCARD) != String::npos;
const auto * insert_query = dynamic_cast<const ASTInsertQuery *>(query.get());
auto partition_by_ast = insert_query ? (insert_query->partition_by ? insert_query->partition_by : partition_by) : nullptr;
bool is_partitioned_implementation = partition_by_ast && has_wildcards;
@ -516,34 +570,70 @@ SinkToStoragePtr StorageHDFS::write(const ASTPtr & query, const StorageMetadataP
{
return std::make_shared<PartitionedHDFSSink>(
partition_by_ast,
uri,
current_uri,
format_name,
metadata_snapshot->getSampleBlock(),
getContext(),
chooseCompressionMethod(uri, compression_method));
context_,
chooseCompressionMethod(current_uri, compression_method));
}
else
{
return std::make_shared<HDFSSink>(uri,
if (is_path_with_globs)
throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED, "URI '{}' contains globs, so the table is in readonly mode", uris.back());
const auto [path_from_uri, uri_without_path] = getPathFromUriAndUriWithoutPath(current_uri);
HDFSBuilderWrapper builder = createHDFSBuilder(uri_without_path + "/", context_->getGlobalContext()->getConfigRef());
HDFSFSPtr fs = createHDFSFS(builder.get());
bool truncate_on_insert = context_->getSettingsRef().hdfs_truncate_on_insert;
if (!truncate_on_insert && !hdfsExists(fs.get(), path_from_uri.c_str()))
{
if (context_->getSettingsRef().hdfs_create_new_file_on_insert)
{
auto pos = uris[0].find_first_of('.', uris[0].find_last_of('/'));
size_t index = uris.size();
String new_uri;
do
{
new_uri = uris[0].substr(0, pos) + "." + std::to_string(index) + (pos == std::string::npos ? "" : uris[0].substr(pos));
++index;
}
while (!hdfsExists(fs.get(), new_uri.c_str()));
uris.push_back(new_uri);
current_uri = new_uri;
}
else
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"File with path {} already exists. If you want to overwrite it, enable setting hdfs_truncate_on_insert, "
"if you want to create new file on each insert, enable setting hdfs_create_new_file_on_insert",
path_from_uri);
}
return std::make_shared<HDFSSink>(current_uri,
format_name,
metadata_snapshot->getSampleBlock(),
getContext(),
chooseCompressionMethod(uri, compression_method));
context_,
chooseCompressionMethod(current_uri, compression_method));
}
}
void StorageHDFS::truncate(const ASTPtr & /* query */, const StorageMetadataPtr &, ContextPtr local_context, TableExclusiveLockHolder &)
{
const size_t begin_of_path = uri.find('/', uri.find("//") + 2);
const String path = uri.substr(begin_of_path);
const String url = uri.substr(0, begin_of_path);
const size_t begin_of_path = uris[0].find('/', uris[0].find("//") + 2);
const String url = uris[0].substr(0, begin_of_path);
HDFSBuilderWrapper builder = createHDFSBuilder(url + "/", local_context->getGlobalContext()->getConfigRef());
HDFSFSPtr fs = createHDFSFS(builder.get());
int ret = hdfsDelete(fs.get(), path.data(), 0);
if (ret)
throw Exception(ErrorCodes::ACCESS_DENIED, "Unable to truncate hdfs table: {}", std::string(hdfsGetLastError()));
for (const auto & uri : uris)
{
const String path = uri.substr(begin_of_path);
int ret = hdfsDelete(fs.get(), path.data(), 0);
if (ret)
throw Exception(ErrorCodes::ACCESS_DENIED, "Unable to truncate hdfs table: {}", std::string(hdfsGetLastError()));
}
}

View File

@ -31,7 +31,7 @@ public:
size_t max_block_size,
unsigned num_streams) override;
SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr /*context*/) override;
SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr context) override;
void truncate(
const ASTPtr & query,
@ -70,11 +70,12 @@ protected:
ASTPtr partition_by = nullptr);
private:
const String uri;
std::vector<const String> uris;
String format_name;
String compression_method;
const bool distributed_processing;
ASTPtr partition_by;
bool is_path_with_globs;
Poco::Logger * log = &Poco::Logger::get("StorageHDFS");
};
@ -95,6 +96,17 @@ public:
std::shared_ptr<Impl> pimpl;
};
class URISIterator
{
public:
URISIterator(const std::vector<const String> & uris_, ContextPtr context);
String next();
private:
class Impl;
/// shared_ptr to have copy constructor
std::shared_ptr<Impl> pimpl;
};
using IteratorWrapper = std::function<String()>;
using StorageHDFSPtr = std::shared_ptr<StorageHDFS>;

View File

@ -15,7 +15,6 @@ namespace ErrorCodes
extern const int NETWORK_ERROR;
extern const int CANNOT_OPEN_FILE;
extern const int CANNOT_FSYNC;
extern const int BAD_ARGUMENTS;
}
@ -38,12 +37,6 @@ struct WriteBufferFromHDFS::WriteBufferFromHDFSImpl
const size_t begin_of_path = hdfs_uri.find('/', hdfs_uri.find("//") + 2);
const String path = hdfs_uri.substr(begin_of_path);
if (path.find_first_of("*?{") != std::string::npos)
throw Exception(ErrorCodes::CANNOT_OPEN_FILE, "URI '{}' contains globs, so the table is in readonly mode", hdfs_uri);
if (!hdfsExists(fs.get(), path.c_str()))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "File {} already exists", path);
fout = hdfsOpenFile(fs.get(), path.c_str(), flags, 0, replication_, 0); /// O_WRONLY meaning create or overwrite i.e., implies O_TRUNCAT here
if (fout == nullptr)

View File

@ -65,6 +65,7 @@ namespace ErrorCodes
extern const int INCOMPATIBLE_COLUMNS;
extern const int CANNOT_STAT;
extern const int LOGICAL_ERROR;
extern const int CANNOT_APPEND_TO_FILE;
extern const int CANNOT_EXTRACT_TABLE_STRUCTURE;
}
@ -285,6 +286,7 @@ StorageFile::StorageFile(const std::string & table_path_, const std::string & us
{
is_db_table = false;
paths = getPathsList(table_path_, user_files_path, args.getContext(), total_bytes_to_read);
is_path_with_globs = paths.size() > 1;
path_for_partitioned_write = table_path_;
setStorageMetadata(args);
}
@ -603,7 +605,7 @@ public:
int table_fd_,
bool use_table_fd_,
std::string base_path_,
std::vector<std::string> paths_,
std::string path_,
const CompressionMethod compression_method_,
const std::optional<FormatSettings> & format_settings_,
const String format_name_,
@ -615,7 +617,7 @@ public:
, table_fd(table_fd_)
, use_table_fd(use_table_fd_)
, base_path(base_path_)
, paths(paths_)
, path(path_)
, compression_method(compression_method_)
, format_name(format_name_)
, format_settings(format_settings_)
@ -632,7 +634,7 @@ public:
int table_fd_,
bool use_table_fd_,
std::string base_path_,
std::vector<std::string> paths_,
const std::string & path_,
const CompressionMethod compression_method_,
const std::optional<FormatSettings> & format_settings_,
const String format_name_,
@ -644,7 +646,7 @@ public:
, table_fd(table_fd_)
, use_table_fd(use_table_fd_)
, base_path(base_path_)
, paths(paths_)
, path(path_)
, compression_method(compression_method_)
, format_name(format_name_)
, format_settings(format_settings_)
@ -666,10 +668,8 @@ public:
}
else
{
if (paths.size() != 1)
throw Exception("Table '" + table_name_for_log + "' is in readonly mode because of globs in filepath", ErrorCodes::DATABASE_ACCESS_DENIED);
flags |= O_WRONLY | O_APPEND | O_CREAT;
naked_buffer = std::make_unique<WriteBufferFromFile>(paths[0], DBMS_DEFAULT_BUFFER_SIZE, flags);
naked_buffer = std::make_unique<WriteBufferFromFile>(path, DBMS_DEFAULT_BUFFER_SIZE, flags);
}
/// In case of formats with prefixes if file is not empty we have already written prefix.
@ -709,7 +709,7 @@ private:
int table_fd;
bool use_table_fd;
std::string base_path;
std::vector<std::string> paths;
std::string path;
CompressionMethod compression_method;
std::string format_name;
std::optional<FormatSettings> format_settings;
@ -752,7 +752,6 @@ public:
{
auto partition_path = PartitionedSink::replaceWildcards(path, partition_id);
PartitionedSink::validatePartitionKey(partition_path, true);
Strings result_paths = {partition_path};
checkCreationIsAllowed(context, context->getUserFilesPath(), partition_path);
return std::make_shared<StorageFileSink>(
metadata_snapshot,
@ -760,7 +759,7 @@ public:
-1,
/* use_table_fd */false,
base_path,
result_paths,
partition_path,
compression_method,
format_settings,
format_name,
@ -794,7 +793,6 @@ SinkToStoragePtr StorageFile::write(
int flags = 0;
std::string path;
if (context->getSettingsRef().engine_file_truncate_on_insert)
flags |= O_TRUNC;
@ -815,7 +813,7 @@ SinkToStoragePtr StorageFile::write(
std::unique_lock{rwlock, getLockTimeout(context)},
base_path,
path_for_partitioned_write,
chooseCompressionMethod(path, compression_method),
chooseCompressionMethod(path_for_partitioned_write, compression_method),
format_settings,
format_name,
context,
@ -823,10 +821,41 @@ SinkToStoragePtr StorageFile::write(
}
else
{
String path;
if (!paths.empty())
{
path = paths[0];
if (is_path_with_globs)
throw Exception("Table '" + getStorageID().getNameForLogs() + "' is in readonly mode because of globs in filepath", ErrorCodes::DATABASE_ACCESS_DENIED);
path = paths.back();
fs::create_directories(fs::path(path).parent_path());
if (!context->getSettingsRef().engine_file_truncate_on_insert && !is_path_with_globs
&& !FormatFactory::instance().checkIfFormatSupportAppend(format_name, context, format_settings) && fs::exists(paths.back())
&& fs::file_size(paths.back()) != 0)
{
if (context->getSettingsRef().engine_file_allow_create_multiple_files)
{
auto pos = paths[0].find_first_of('.', paths[0].find_last_of('/'));
size_t index = paths.size();
String new_path;
do
{
new_path = paths[0].substr(0, pos) + "." + std::to_string(index) + (pos == std::string::npos ? "" : paths[0].substr(pos));
++index;
}
while (fs::exists(new_path));
paths.push_back(new_path);
path = new_path;
}
else
throw Exception(
ErrorCodes::CANNOT_APPEND_TO_FILE,
"Cannot append data in format {} to file, because this format doesn't support appends."
" You can allow to create a new file "
"on each insert by enabling setting engine_file_allow_create_multiple_files",
format_name);
}
}
return std::make_shared<StorageFileSink>(
@ -836,7 +865,7 @@ SinkToStoragePtr StorageFile::write(
table_fd,
use_table_fd,
base_path,
paths,
path,
chooseCompressionMethod(path, compression_method),
format_settings,
format_name,
@ -882,7 +911,7 @@ void StorageFile::truncate(
ContextPtr /* context */,
TableExclusiveLockHolder &)
{
if (paths.size() != 1)
if (is_path_with_globs)
throw Exception("Can't truncate table '" + getStorageID().getNameForLogs() + "' in readonly mode", ErrorCodes::DATABASE_ACCESS_DENIED);
if (use_table_fd)
@ -892,11 +921,14 @@ void StorageFile::truncate(
}
else
{
if (!fs::exists(paths[0]))
return;
for (const auto & path : paths)
{
if (!fs::exists(path))
continue;
if (0 != ::truncate(paths[0].c_str(), 0))
throwFromErrnoWithPath("Cannot truncate file " + paths[0], paths[0], ErrorCodes::CANNOT_TRUNCATE_FILE);
if (0 != ::truncate(path.c_str(), 0))
throwFromErrnoWithPath("Cannot truncate file " + path, path, ErrorCodes::CANNOT_TRUNCATE_FILE);
}
}
}

View File

@ -120,6 +120,8 @@ private:
size_t total_bytes_to_read = 0;
String path_for_partitioned_write;
bool is_path_with_globs = false;
};
}

View File

@ -68,7 +68,7 @@ namespace ErrorCodes
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int S3_ERROR;
extern const int UNEXPECTED_EXPRESSION;
extern const int CANNOT_OPEN_FILE;
extern const int DATABASE_ACCESS_DENIED;
extern const int CANNOT_EXTRACT_TABLE_STRUCTURE;
}
@ -82,8 +82,6 @@ public:
Impl(Aws::S3::S3Client & client_, const S3::URI & globbed_uri_)
: client(client_), globbed_uri(globbed_uri_)
{
std::lock_guard lock(mutex);
if (globbed_uri.bucket.find_first_of("*?{") != globbed_uri.bucket.npos)
throw Exception("Expression can not have wildcards inside bucket name", ErrorCodes::UNEXPECTED_EXPRESSION);
@ -176,6 +174,37 @@ String StorageS3Source::DisclosedGlobIterator::next()
return pimpl->next();
}
class StorageS3Source::KeysIterator::Impl
{
public:
explicit Impl(const std::vector<String> & keys_) : keys(keys_), keys_iter(keys.begin())
{
}
String next()
{
std::lock_guard lock(mutex);
if (keys_iter == keys.end())
return "";
auto key = *keys_iter;
++keys_iter;
return key;
}
private:
std::mutex mutex;
Strings keys;
Strings::iterator keys_iter;
};
StorageS3Source::KeysIterator::KeysIterator(const std::vector<String> & keys_) : pimpl(std::make_shared<StorageS3Source::KeysIterator::Impl>(keys_))
{
}
String StorageS3Source::KeysIterator::next()
{
return pimpl->next();
}
Block StorageS3Source::getHeader(Block sample_block, bool with_path_column, bool with_file_column)
{
@ -296,6 +325,39 @@ Chunk StorageS3Source::generate()
return generate();
}
static bool checkIfObjectExists(const std::shared_ptr<Aws::S3::S3Client> & client, const String & bucket, const String & key)
{
bool is_finished = false;
Aws::S3::Model::ListObjectsV2Request request;
Aws::S3::Model::ListObjectsV2Outcome outcome;
request.SetBucket(bucket);
request.SetPrefix(key);
while (!is_finished)
{
outcome = client->ListObjectsV2(request);
if (!outcome.IsSuccess())
throw Exception(
ErrorCodes::S3_ERROR,
"Could not list objects in bucket {} with key {}, S3 exception: {}, message: {}",
quoteString(bucket),
quoteString(key),
backQuote(outcome.GetError().GetExceptionName()),
quoteString(outcome.GetError().GetMessage()));
const auto & result_batch = outcome.GetResult().GetContents();
for (const auto & obj : result_batch)
{
if (obj.GetKey() == key)
return true;
}
request.SetContinuationToken(outcome.GetResult().GetNextContinuationToken());
is_finished = !outcome.GetResult().GetIsTruncated();
}
return false;
}
class StorageS3Sink : public SinkToStorage
{
@ -315,9 +377,6 @@ public:
, sample_block(sample_block_)
, format_settings(format_settings_)
{
if (key.find_first_of("*?{") != std::string::npos)
throw Exception(ErrorCodes::CANNOT_OPEN_FILE, "S3 key '{}' contains globs, so the table is in readonly mode", key);
write_buf = wrapWriteBufferWithCompressionMethod(
std::make_unique<WriteBufferFromS3>(client, bucket, key, min_upload_part_size, max_single_part_upload_size), compression_method, 3);
writer = FormatFactory::instance().getOutputFormatParallelIfPossible(format, *write_buf, sample_block, context, {}, format_settings);
@ -419,7 +478,6 @@ private:
std::optional<FormatSettings> format_settings;
ExpressionActionsPtr partition_by_expr;
String partition_by_column_name;
static void validateBucket(const String & str)
{
@ -468,6 +526,7 @@ StorageS3::StorageS3(
ASTPtr partition_by_)
: IStorage(table_id_)
, client_auth{uri_, access_key_id_, secret_access_key_, max_connections_, {}, {}} /// Client and settings will be updated later
, keys({uri_.key})
, format_name(format_name_)
, max_single_read_retries(max_single_read_retries_)
, min_upload_part_size(min_upload_part_size_)
@ -477,6 +536,7 @@ StorageS3::StorageS3(
, distributed_processing(distributed_processing_)
, format_settings(format_settings_)
, partition_by(partition_by_)
, is_key_with_globs(uri_.key.find_first_of("*?{") != std::string::npos)
{
context_->getGlobalContext()->getRemoteHostFilter().checkURL(uri_.uri);
StorageInMemoryMetadata storage_metadata;
@ -484,7 +544,7 @@ StorageS3::StorageS3(
updateClientAndAuthSettings(context_, client_auth);
if (columns_.empty())
{
auto columns = getTableStructureFromDataImpl(format_name, client_auth, max_single_read_retries_, compression_method, distributed_processing_, format_settings, context_);
auto columns = getTableStructureFromDataImpl(format_name, client_auth, max_single_read_retries_, compression_method, distributed_processing_, is_key_with_globs, format_settings, context_);
storage_metadata.setColumns(columns);
}
else
@ -495,9 +555,8 @@ StorageS3::StorageS3(
setInMemoryMetadata(storage_metadata);
}
std::shared_ptr<StorageS3Source::IteratorWrapper> StorageS3::createFileIterator(const ClientAuthentication & client_auth, bool distributed_processing, ContextPtr local_context)
std::shared_ptr<StorageS3Source::IteratorWrapper> StorageS3::createFileIterator(const ClientAuthentication & client_auth, const std::vector<String> & keys, bool is_key_with_globs, bool distributed_processing, ContextPtr local_context)
{
std::shared_ptr<StorageS3Source::IteratorWrapper> iterator_wrapper{nullptr};
if (distributed_processing)
{
return std::make_shared<StorageS3Source::IteratorWrapper>(
@ -505,13 +564,23 @@ std::shared_ptr<StorageS3Source::IteratorWrapper> StorageS3::createFileIterator(
return callback();
});
}
/// Iterate through disclosed globs and make a source for each file
auto glob_iterator = std::make_shared<StorageS3Source::DisclosedGlobIterator>(*client_auth.client, client_auth.uri);
return std::make_shared<StorageS3Source::IteratorWrapper>([glob_iterator]()
else if (is_key_with_globs)
{
return glob_iterator->next();
});
/// Iterate through disclosed globs and make a source for each file
auto glob_iterator = std::make_shared<StorageS3Source::DisclosedGlobIterator>(*client_auth.client, client_auth.uri);
return std::make_shared<StorageS3Source::IteratorWrapper>([glob_iterator]()
{
return glob_iterator->next();
});
}
else
{
auto keys_iterator = std::make_shared<StorageS3Source::KeysIterator>(keys);
return std::make_shared<StorageS3Source::IteratorWrapper>([keys_iterator]()
{
return keys_iterator->next();
});
}
}
Pipe StorageS3::read(
@ -536,7 +605,7 @@ Pipe StorageS3::read(
need_file_column = true;
}
std::shared_ptr<StorageS3Source::IteratorWrapper> iterator_wrapper = createFileIterator(client_auth, distributed_processing, local_context);
std::shared_ptr<StorageS3Source::IteratorWrapper> iterator_wrapper = createFileIterator(client_auth, keys, is_key_with_globs, distributed_processing, local_context);
for (size_t i = 0; i < num_streams; ++i)
{
@ -567,8 +636,8 @@ SinkToStoragePtr StorageS3::write(const ASTPtr & query, const StorageMetadataPtr
updateClientAndAuthSettings(local_context, client_auth);
auto sample_block = metadata_snapshot->getSampleBlock();
auto chosen_compression_method = chooseCompressionMethod(client_auth.uri.key, compression_method);
bool has_wildcards = client_auth.uri.bucket.find(PARTITION_ID_WILDCARD) != String::npos || client_auth.uri.key.find(PARTITION_ID_WILDCARD) != String::npos;
auto chosen_compression_method = chooseCompressionMethod(keys.back(), compression_method);
bool has_wildcards = client_auth.uri.bucket.find(PARTITION_ID_WILDCARD) != String::npos || keys.back().find(PARTITION_ID_WILDCARD) != String::npos;
auto insert_query = std::dynamic_pointer_cast<ASTInsertQuery>(query);
auto partition_by_ast = insert_query ? (insert_query->partition_by ? insert_query->partition_by : partition_by) : nullptr;
@ -585,12 +654,41 @@ SinkToStoragePtr StorageS3::write(const ASTPtr & query, const StorageMetadataPtr
chosen_compression_method,
client_auth.client,
client_auth.uri.bucket,
client_auth.uri.key,
keys.back(),
min_upload_part_size,
max_single_part_upload_size);
}
else
{
if (is_key_with_globs)
throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED, "S3 key '{}' contains globs, so the table is in readonly mode", client_auth.uri.key);
bool truncate_in_insert = local_context->getSettingsRef().s3_truncate_on_insert;
if (!truncate_in_insert && checkIfObjectExists(client_auth.client, client_auth.uri.bucket, keys.back()))
{
if (local_context->getSettingsRef().s3_create_new_file_on_insert)
{
size_t index = keys.size();
auto pos = keys[0].find_first_of('.');
String new_key;
do
{
new_key = keys[0].substr(0, pos) + "." + std::to_string(index) + (pos == std::string::npos ? "" : keys[0].substr(pos));
++index;
}
while (checkIfObjectExists(client_auth.client, client_auth.uri.bucket, new_key));
keys.push_back(new_key);
}
else
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Object in bucket {} with key {} already exists. If you want to overwrite it, enable setting s3_truncate_on_insert, if you "
"want to create a new file on each insert, enable setting s3_create_new_file_on_insert",
client_auth.uri.bucket,
keys.back());
}
return std::make_shared<StorageS3Sink>(
format_name,
sample_block,
@ -599,7 +697,7 @@ SinkToStoragePtr StorageS3::write(const ASTPtr & query, const StorageMetadataPtr
chosen_compression_method,
client_auth.client,
client_auth.uri.bucket,
client_auth.uri.key,
keys.back(),
min_upload_part_size,
max_single_part_upload_size);
}
@ -610,11 +708,17 @@ void StorageS3::truncate(const ASTPtr & /* query */, const StorageMetadataPtr &,
{
updateClientAndAuthSettings(local_context, client_auth);
Aws::S3::Model::ObjectIdentifier obj;
obj.SetKey(client_auth.uri.key);
if (is_key_with_globs)
throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED, "S3 key '{}' contains globs, so the table is in readonly mode", client_auth.uri.key);
Aws::S3::Model::Delete delkeys;
delkeys.AddObjects(std::move(obj));
for (const auto & key : keys)
{
Aws::S3::Model::ObjectIdentifier obj;
obj.SetKey(key);
delkeys.AddObjects(std::move(obj));
}
Aws::S3::Model::DeleteObjectsRequest request;
request.SetBucket(client_auth.uri.bucket);
@ -734,7 +838,7 @@ ColumnsDescription StorageS3::getTableStructureFromData(
{
ClientAuthentication client_auth{uri, access_key_id, secret_access_key, max_connections, {}, {}};
updateClientAndAuthSettings(ctx, client_auth);
return getTableStructureFromDataImpl(format, client_auth, max_single_read_retries, compression_method, distributed_processing, format_settings, ctx);
return getTableStructureFromDataImpl(format, client_auth, max_single_read_retries, compression_method, distributed_processing, uri.key.find_first_of("*?{") != std::string::npos, format_settings, ctx);
}
ColumnsDescription StorageS3::getTableStructureFromDataImpl(
@ -743,12 +847,14 @@ ColumnsDescription StorageS3::getTableStructureFromDataImpl(
UInt64 max_single_read_retries,
const String & compression_method,
bool distributed_processing,
bool is_key_with_globs,
const std::optional<FormatSettings> & format_settings,
ContextPtr ctx)
{
std::vector<String> keys = {client_auth.uri.key};
auto read_buffer_creator = [&]()
{
auto file_iterator = createFileIterator(client_auth, distributed_processing, ctx);
auto file_iterator = createFileIterator(client_auth, keys, is_key_with_globs, distributed_processing, ctx);
String current_key = (*file_iterator)();
if (current_key.empty())
throw Exception(

View File

@ -44,6 +44,18 @@ public:
std::shared_ptr<Impl> pimpl;
};
class KeysIterator
{
public:
explicit KeysIterator(const std::vector<String> & keys_);
String next();
private:
class Impl;
/// shared_ptr to have copy constructor
std::shared_ptr<Impl> pimpl;
};
using IteratorWrapper = std::function<String()>;
static Block getHeader(Block sample_block, bool with_path_column, bool with_file_column);
@ -174,6 +186,7 @@ private:
};
ClientAuthentication client_auth;
std::vector<String> keys;
String format_name;
UInt64 max_single_read_retries;
@ -184,10 +197,11 @@ private:
const bool distributed_processing;
std::optional<FormatSettings> format_settings;
ASTPtr partition_by;
bool is_key_with_globs = false;
static void updateClientAndAuthSettings(ContextPtr, ClientAuthentication &);
static std::shared_ptr<StorageS3Source::IteratorWrapper> createFileIterator(const ClientAuthentication & client_auth, bool distributed_processing, ContextPtr local_context);
static std::shared_ptr<StorageS3Source::IteratorWrapper> createFileIterator(const ClientAuthentication & client_auth, const std::vector<String> & keys, bool is_key_with_globs, bool distributed_processing, ContextPtr local_context);
static ColumnsDescription getTableStructureFromDataImpl(
const String & format,
@ -195,6 +209,7 @@ private:
UInt64 max_single_read_retries,
const String & compression_method,
bool distributed_processing,
bool is_key_with_globs,
const std::optional<FormatSettings> & format_settings,
ContextPtr ctx);
};

View File

@ -38,9 +38,22 @@ class DockerImage:
self.parent = parent
self.built = False
def __eq__(self, other):
def __eq__(self, other) -> bool: # type: ignore
"""Is used to check if DockerImage is in a set or not"""
return self.path == other.path
return self.path == other.path and self.repo == self.repo
def __lt__(self, other) -> bool:
if not isinstance(other, DockerImage):
return False
if self.parent and not other.parent:
return False
if not self.parent and other.parent:
return True
if self.path < other.path:
return True
if self.repo < other.repo:
return True
return False
def __hash__(self):
return hash(self.path)
@ -49,7 +62,7 @@ class DockerImage:
return self.repo
def __repr__(self):
return f"DockerImage(path={self.path},path={self.path},parent={self.parent})"
return f"DockerImage(path={self.path},repo={self.repo},parent={self.parent})"
def get_changed_docker_images(
@ -105,7 +118,9 @@ def get_changed_docker_images(
dependent,
image,
)
changed_images.append(DockerImage(dependent, image.repo, image))
changed_images.append(
DockerImage(dependent, images_dict[dependent]["name"], image)
)
index += 1
if index > 5 * len(images_dict):
# Sanity check to prevent infinite loop.

View File

@ -22,24 +22,59 @@ class TestDockerImageCheck(unittest.TestCase):
"docker/test/base",
"docker/docs/builder",
}
images = di.get_changed_docker_images(pr_info, "/", self.docker_images_path)
expected = {
di.DockerImage("docker/test/base", "clickhouse/test-base"),
di.DockerImage("docker/docs/builder", "clickhouse/docs-builder"),
di.DockerImage("docker/test/stateless", "clickhouse/stateless-test"),
di.DockerImage(
"docker/test/integration/base", "clickhouse/integration-test"
),
di.DockerImage("docker/test/fuzzer", "clickhouse/fuzzer"),
di.DockerImage(
"docker/test/keeper-jepsen", "clickhouse/keeper-jepsen-test"
),
di.DockerImage("docker/docs/check", "clickhouse/docs-check"),
di.DockerImage("docker/docs/release", "clickhouse/docs-release"),
di.DockerImage("docker/test/stateful", "clickhouse/stateful-test"),
di.DockerImage("docker/test/unit", "clickhouse/unit-test"),
di.DockerImage("docker/test/stress", "clickhouse/stress-test"),
}
images = sorted(
list(di.get_changed_docker_images(pr_info, "/", self.docker_images_path))
)
self.maxDiff = None
expected = sorted(
[
di.DockerImage("docker/test/base", "clickhouse/test-base"),
di.DockerImage("docker/docs/builder", "clickhouse/docs-builder"),
di.DockerImage(
"docker/test/stateless",
"clickhouse/stateless-test",
"clickhouse/test-base",
),
di.DockerImage(
"docker/test/integration/base",
"clickhouse/integration-test",
"clickhouse/test-base",
),
di.DockerImage(
"docker/test/fuzzer", "clickhouse/fuzzer", "clickhouse/test-base"
),
di.DockerImage(
"docker/test/keeper-jepsen",
"clickhouse/keeper-jepsen-test",
"clickhouse/test-base",
),
di.DockerImage(
"docker/docs/check",
"clickhouse/docs-check",
"clickhouse/docs-builder",
),
di.DockerImage(
"docker/docs/release",
"clickhouse/docs-release",
"clickhouse/docs-builder",
),
di.DockerImage(
"docker/test/stateful",
"clickhouse/stateful-test",
"clickhouse/stateless-test",
),
di.DockerImage(
"docker/test/unit",
"clickhouse/unit-test",
"clickhouse/stateless-test",
),
di.DockerImage(
"docker/test/stress",
"clickhouse/stress-test",
"clickhouse/stateful-test",
),
]
)
self.assertEqual(images, expected)
def test_gen_version(self):

View File

@ -366,6 +366,43 @@ def test_hdfs_directory_not_exist(started_cluster):
node1.query(ddl)
assert "" == node1.query("select * from HDFSStorageWithNotExistDir")
def test_overwrite(started_cluster):
hdfs_api = started_cluster.hdfs_api
table_function = f"hdfs('hdfs://hdfs1:9000/data', 'Parquet', 'a Int32, b String')"
node1.query(f"create table test_overwrite as {table_function}")
node1.query(f"insert into test_overwrite select number, randomString(100) from numbers(5)")
node1.query_and_get_error(f"insert into test_overwrite select number, randomString(100) FROM numbers(10)")
node1.query(f"insert into test_overwrite select number, randomString(100) from numbers(10) settings hdfs_truncate_on_insert=1")
result = node1.query(f"select count() from test_overwrite")
assert(int(result) == 10)
def test_multiple_inserts(started_cluster):
hdfs_api = started_cluster.hdfs_api
table_function = f"hdfs('hdfs://hdfs1:9000/data_multiple_inserts', 'Parquet', 'a Int32, b String')"
node1.query(f"create table test_multiple_inserts as {table_function}")
node1.query(f"insert into test_multiple_inserts select number, randomString(100) from numbers(10)")
node1.query(f"insert into test_multiple_inserts select number, randomString(100) from numbers(20) settings hdfs_create_new_file_on_insert=1")
node1.query(f"insert into test_multiple_inserts select number, randomString(100) from numbers(30) settings hdfs_create_new_file_on_insert=1")
result = node1.query(f"select count() from test_multiple_inserts")
assert(int(result) == 60)
result = node1.query(f"drop table test_multiple_inserts")
table_function = f"hdfs('hdfs://hdfs1:9000/data_multiple_inserts.gz', 'Parquet', 'a Int32, b String')"
node1.query(f"create table test_multiple_inserts as {table_function}")
node1.query(f"insert into test_multiple_inserts select number, randomString(100) FROM numbers(10)")
node1.query(f"insert into test_multiple_inserts select number, randomString(100) FROM numbers(20) settings hdfs_create_new_file_on_insert=1")
node1.query(f"insert into test_multiple_inserts select number, randomString(100) FROM numbers(30) settings hdfs_create_new_file_on_insert=1")
result = node1.query(f"select count() from test_multiple_inserts")
assert(int(result) == 60)
def test_format_detection(started_cluster):
node1.query(f"create table arrow_table (x UInt64) engine=HDFS('hdfs://hdfs1:9000/data.arrow')")
node1.query(f"insert into arrow_table select 1")

View File

@ -10,6 +10,11 @@
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</s3_parquet>
<s3_parquet_gz>
<url>http://minio1:9001/root/test_parquet_gz</url>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</s3_parquet_gz>
<s3_orc>
<url>http://minio1:9001/root/test_orc</url>
<access_key_id>minio</access_key_id>

View File

@ -136,7 +136,7 @@ def test_put(started_cluster, maybe_auth, positive, compression):
values_csv = "1,2,3\n3,2,1\n78,43,45\n"
filename = "test.csv"
put_query = f"""insert into table function s3('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/{bucket}/{filename}',
{maybe_auth}'CSV', '{table_format}', {compression}) values {values}"""
{maybe_auth}'CSV', '{table_format}', {compression}) values settings s3_truncate_on_insert=1 {values}"""
try:
run_query(instance, put_query)
@ -298,7 +298,7 @@ def test_put_csv(started_cluster, maybe_auth, positive):
instance = started_cluster.instances["dummy"] # type: ClickHouseInstance
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
filename = "test.csv"
put_query = "insert into table function s3('http://{}:{}/{}/{}', {}'CSV', '{}') format CSV".format(
put_query = "insert into table function s3('http://{}:{}/{}/{}', {}'CSV', '{}') format CSV settings s3_truncate_on_insert=1".format(
started_cluster.minio_ip, MINIO_INTERNAL_PORT, bucket, filename, maybe_auth, table_format)
csv_data = "8,9,16\n11,18,13\n22,14,2\n"
@ -322,7 +322,7 @@ def test_put_get_with_redirect(started_cluster):
values = "(1, 1, 1), (1, 1, 1), (11, 11, 11)"
values_csv = "1,1,1\n1,1,1\n11,11,11\n"
filename = "test.csv"
query = "insert into table function s3('http://{}:{}/{}/{}', 'CSV', '{}') values {}".format(
query = "insert into table function s3('http://{}:{}/{}/{}', 'CSV', '{}') values settings s3_truncate_on_insert=1 {}".format(
started_cluster.minio_redirect_host, started_cluster.minio_redirect_port, bucket, filename, table_format, values)
run_query(instance, query)
@ -350,12 +350,12 @@ def test_put_with_zero_redirect(started_cluster):
filename = "test.csv"
# Should work without redirect
query = "insert into table function s3('http://{}:{}/{}/{}', 'CSV', '{}') values {}".format(
query = "insert into table function s3('http://{}:{}/{}/{}', 'CSV', '{}') values settings s3_truncate_on_insert=1 {}".format(
started_cluster.minio_ip, MINIO_INTERNAL_PORT, bucket, filename, table_format, values)
run_query(instance, query)
# Should not work with redirect
query = "insert into table function s3('http://{}:{}/{}/{}', 'CSV', '{}') values {}".format(
query = "insert into table function s3('http://{}:{}/{}/{}', 'CSV', '{}') values settings s3_truncate_on_insert=1 {}".format(
started_cluster.minio_redirect_host, started_cluster.minio_redirect_port, bucket, filename, table_format, values)
exception_raised = False
try:
@ -805,13 +805,13 @@ def test_seekable_formats(started_cluster):
instance = started_cluster.instances["dummy"] # type: ClickHouseInstance
table_function = f"s3(s3_parquet, structure='a Int32, b String', format='Parquet')"
instance.query(f"insert into table function {table_function} SELECT number, randomString(100) FROM numbers(5000000)")
instance.query(f"insert into table function {table_function} SELECT number, randomString(100) FROM numbers(5000000) settings s3_truncate_on_insert=1")
result = instance.query(f"SELECT count() FROM {table_function}")
assert(int(result) == 5000000)
table_function = f"s3(s3_orc, structure='a Int32, b String', format='ORC')"
exec_query_with_retry(instance, f"insert into table function {table_function} SELECT number, randomString(100) FROM numbers(5000000)")
exec_query_with_retry(instance, f"insert into table function {table_function} SELECT number, randomString(100) FROM numbers(5000000) settings s3_truncate_on_insert=1")
result = instance.query(f"SELECT count() FROM {table_function}")
assert(int(result) == 5000000)
@ -827,14 +827,14 @@ def test_seekable_formats_url(started_cluster):
instance = started_cluster.instances["dummy"]
table_function = f"s3(s3_parquet, structure='a Int32, b String', format='Parquet')"
instance.query(f"insert into table function {table_function} select number, randomString(100) from numbers(5000000)")
instance.query(f"insert into table function {table_function} select number, randomString(100) from numbers(5000000) settings s3_truncate_on_insert=1")
table_function = f"url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_parquet', 'Parquet', 'a Int32, b String')"
result = instance.query(f"SELECT count() FROM {table_function}")
assert(int(result) == 5000000)
table_function = f"s3(s3_orc, structure='a Int32, b String', format='ORC')"
exec_query_with_retry(instance, f"insert into table function {table_function} select number, randomString(100) from numbers(5000000)")
exec_query_with_retry(instance, f"insert into table function {table_function} select number, randomString(100) from numbers(5000000) settings s3_truncate_on_insert=1")
table_function = f"url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_orc', 'ORC', 'a Int32, b String')"
result = instance.query(f"SELECT count() FROM {table_function}")
@ -917,6 +917,48 @@ def test_empty_file(started_cluster):
assert(int(result) == 0)
def test_overwrite(started_cluster):
bucket = started_cluster.minio_bucket
instance = started_cluster.instances["dummy"]
table_function = f"s3(s3_parquet, structure='a Int32, b String', format='Parquet')"
instance.query(f"create table test_overwrite as {table_function}")
instance.query(f"truncate table test_overwrite")
instance.query(f"insert into test_overwrite select number, randomString(100) from numbers(50) settings s3_truncate_on_insert=1")
instance.query_and_get_error(f"insert into test_overwrite select number, randomString(100) from numbers(100)")
instance.query(f"insert into test_overwrite select number, randomString(100) from numbers(200) settings s3_truncate_on_insert=1")
result = instance.query(f"select count() from test_overwrite")
assert(int(result) == 200)
def test_create_new_files_on_insert(started_cluster):
bucket = started_cluster.minio_bucket
instance = started_cluster.instances["dummy"]
table_function = f"s3(s3_parquet, structure='a Int32, b String', format='Parquet')"
instance.query(f"create table test_multiple_inserts as {table_function}")
instance.query(f"truncate table test_multiple_inserts")
instance.query(f"insert into test_multiple_inserts select number, randomString(100) from numbers(10) settings s3_truncate_on_insert=1")
instance.query(f"insert into test_multiple_inserts select number, randomString(100) from numbers(20) settings s3_create_new_file_on_insert=1")
instance.query(f"insert into test_multiple_inserts select number, randomString(100) from numbers(30) settings s3_create_new_file_on_insert=1")
result = instance.query(f"select count() from test_multiple_inserts")
assert(int(result) == 60)
instance.query(f"drop table test_multiple_inserts")
table_function = f"s3(s3_parquet_gz, structure='a Int32, b String', format='Parquet')"
instance.query(f"create table test_multiple_inserts as {table_function}")
instance.query(f"truncate table test_multiple_inserts")
instance.query(f"insert into test_multiple_inserts select number, randomString(100) from numbers(10) settings s3_truncate_on_insert=1")
instance.query(f"insert into test_multiple_inserts select number, randomString(100) from numbers(20) settings s3_create_new_file_on_insert=1")
instance.query(f"insert into test_multiple_inserts select number, randomString(100) from numbers(30) settings s3_create_new_file_on_insert=1")
result = instance.query(f"select count() from test_multiple_inserts")
assert(int(result) == 60)
def test_format_detection(started_cluster):
bucket = started_cluster.minio_bucket
instance = started_cluster.instances["dummy"]

View File

@ -1,11 +1,14 @@
-- Tags: no-parallel
-- ^^^^^^^^^^^ otherwise you may hit TOO_DEEP_RECURSION error during querying system.columns
DROP TABLE IF EXISTS merge1;
DROP TABLE IF EXISTS merge2;
CREATE TABLE IF NOT EXISTS merge1 (x UInt64) ENGINE = Merge(currentDatabase(), '^merge\\d$');
CREATE TABLE IF NOT EXISTS merge2 (x UInt64) ENGINE = Merge(currentDatabase(), '^merge\\d$');
SELECT * FROM merge1; -- { serverError 306 }
SELECT * FROM merge2; -- { serverError 306 }
SELECT * FROM merge1; -- { serverError TOO_DEEP_RECURSION }
SELECT * FROM merge2; -- { serverError TOO_DEEP_RECURSION }
DROP TABLE merge1;
DROP TABLE merge2;

View File

@ -12,7 +12,6 @@ do
${CLICKHOUSE_CLIENT} --query "CREATE TABLE file (x UInt64) ENGINE = File(TSV, '${CLICKHOUSE_DATABASE}/${m}.tsv.${m}')"
${CLICKHOUSE_CLIENT} --query "TRUNCATE TABLE file"
${CLICKHOUSE_CLIENT} --query "INSERT INTO file SELECT * FROM numbers(1000000)"
sleep 1
${CLICKHOUSE_CLIENT} --query "SELECT count(), max(x) FROM file"
${CLICKHOUSE_CLIENT} --query "DROP TABLE file"
done

View File

@ -99,14 +99,14 @@ SYSTEM RELOAD FUNCTION ['SYSTEM RELOAD FUNCTIONS','RELOAD FUNCTION','RELOAD FUNC
SYSTEM RELOAD EMBEDDED DICTIONARIES ['RELOAD EMBEDDED DICTIONARIES'] GLOBAL SYSTEM RELOAD
SYSTEM RELOAD [] \N SYSTEM
SYSTEM RESTART DISK ['SYSTEM RESTART DISK'] GLOBAL SYSTEM
SYSTEM MERGES ['SYSTEM STOP MERGES','SYSTEM START MERGES','STOP_MERGES','START MERGES'] TABLE SYSTEM
SYSTEM MERGES ['SYSTEM STOP MERGES','SYSTEM START MERGES','STOP MERGES','START MERGES'] TABLE SYSTEM
SYSTEM TTL MERGES ['SYSTEM STOP TTL MERGES','SYSTEM START TTL MERGES','STOP TTL MERGES','START TTL MERGES'] TABLE SYSTEM
SYSTEM FETCHES ['SYSTEM STOP FETCHES','SYSTEM START FETCHES','STOP FETCHES','START FETCHES'] TABLE SYSTEM
SYSTEM MOVES ['SYSTEM STOP MOVES','SYSTEM START MOVES','STOP MOVES','START MOVES'] TABLE SYSTEM
SYSTEM DISTRIBUTED SENDS ['SYSTEM STOP DISTRIBUTED SENDS','SYSTEM START DISTRIBUTED SENDS','STOP DISTRIBUTED SENDS','START DISTRIBUTED SENDS'] TABLE SYSTEM SENDS
SYSTEM REPLICATED SENDS ['SYSTEM STOP REPLICATED SENDS','SYSTEM START REPLICATED SENDS','STOP_REPLICATED_SENDS','START REPLICATED SENDS'] TABLE SYSTEM SENDS
SYSTEM REPLICATED SENDS ['SYSTEM STOP REPLICATED SENDS','SYSTEM START REPLICATED SENDS','STOP REPLICATED SENDS','START REPLICATED SENDS'] TABLE SYSTEM SENDS
SYSTEM SENDS ['SYSTEM STOP SENDS','SYSTEM START SENDS','STOP SENDS','START SENDS'] \N SYSTEM
SYSTEM REPLICATION QUEUES ['SYSTEM STOP REPLICATION QUEUES','SYSTEM START REPLICATION QUEUES','STOP_REPLICATION_QUEUES','START REPLICATION QUEUES'] TABLE SYSTEM
SYSTEM REPLICATION QUEUES ['SYSTEM STOP REPLICATION QUEUES','SYSTEM START REPLICATION QUEUES','STOP REPLICATION QUEUES','START REPLICATION QUEUES'] TABLE SYSTEM
SYSTEM DROP REPLICA ['DROP REPLICA'] TABLE SYSTEM
SYSTEM SYNC REPLICA ['SYNC REPLICA'] TABLE SYSTEM
SYSTEM RESTART REPLICA ['RESTART REPLICA'] TABLE SYSTEM

View File

@ -0,0 +1,45 @@
Native
9999
99999
999999
2499999
Values
9999
99999
999999
2499999
JSONCompactEachRow
9999
99999
999999
2499999
TSKV
9999
99999
999999
2499999
TSV
9999
99999
999999
2499999
CSV
9999
99999
999999
2499999
JSONEachRow
9999
99999
999999
2499999
JSONCompactEachRow
9999
99999
999999
2499999
JSONStringsEachRow
9999
99999
999999
2499999

View File

@ -0,0 +1,21 @@
#!/usr/bin/env bash
# Tags: no-parallel
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
for format in Native Values JSONCompactEachRow TSKV TSV CSV JSONEachRow JSONCompactEachRow JSONStringsEachRow
do
echo $format
${CLICKHOUSE_CLIENT} --query "DROP TABLE IF EXISTS file"
${CLICKHOUSE_CLIENT} --query "CREATE TABLE file (x UInt64) ENGINE = File($format, '${CLICKHOUSE_DATABASE}/data.$format.lz4')"
for size in 10000 100000 1000000 2500000
do
${CLICKHOUSE_CLIENT} --query "TRUNCATE TABLE file"
${CLICKHOUSE_CLIENT} --query "INSERT INTO file SELECT * FROM numbers($size)"
${CLICKHOUSE_CLIENT} --query "SELECT max(x) FROM file"
done
done
${CLICKHOUSE_CLIENT} --query "DROP TABLE file"

View File

@ -0,0 +1,100 @@
0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

View File

@ -0,0 +1,39 @@
-- Tags: no-fasttest, no-parallel
drop table if exists test;
create table test (number UInt64) engine=File('Parquet');
insert into test select * from numbers(10);
insert into test select * from numbers(10, 10); -- { serverError CANNOT_APPEND_TO_FILE }
insert into test select * from numbers(10, 10) settings engine_file_allow_create_multiple_files=1;
select * from test order by number;
truncate table test;
drop table test;
create table test (number UInt64) engine=File('Parquet', 'test_02155/test1/data.Parquet');
insert into test select * from numbers(10) settings engine_file_truncate_on_insert=1;
insert into test select * from numbers(10, 10); -- { serverError CANNOT_APPEND_TO_FILE }
insert into test select * from numbers(10, 10) settings engine_file_allow_create_multiple_files=1;
select * from test order by number;
drop table test;
insert into table function file(concat(currentDatabase(), '/test2/data.Parquet'), 'Parquet', 'number UInt64') select * from numbers(10) settings engine_file_truncate_on_insert=1;
insert into table function file(concat(currentDatabase(), '/test2/data.Parquet'), 'Parquet', 'number UInt64') select * from numbers(10, 10); -- { serverError CANNOT_APPEND_TO_FILE }
insert into table function file(concat(currentDatabase(), '/test2/data.Parquet'), 'Parquet', 'number UInt64') select * from numbers(10, 10) settings engine_file_allow_create_multiple_files=1;
select * from file(concat(currentDatabase(), '/test2/data.Parquet'), 'Parquet', 'number UInt64');
select * from file(concat(currentDatabase(), '/test2/data.1.Parquet'), 'Parquet', 'number UInt64');
create table test (number UInt64) engine=File('Parquet', 'test_02155/test3/data.Parquet.gz');
insert into test select * from numbers(10) settings engine_file_truncate_on_insert=1;
;
insert into test select * from numbers(10, 10); -- { serverError CANNOT_APPEND_TO_FILE }
insert into test select * from numbers(10, 10) settings engine_file_allow_create_multiple_files=1;
select * from test order by number;
drop table test;
insert into table function file(concat(currentDatabase(), '/test4/data.Parquet.gz'), 'Parquet', 'number UInt64') select * from numbers(10) settings engine_file_truncate_on_insert=1;
insert into table function file(concat(currentDatabase(), '/test4/data.Parquet.gz'), 'Parquet', 'number UInt64') select * from numbers(10, 10); -- { serverError CANNOT_APPEND_TO_FILE }
insert into table function file(concat(currentDatabase(), '/test4/data.Parquet.gz'), 'Parquet', 'number UInt64') select * from numbers(10, 10) settings engine_file_allow_create_multiple_files=1;
select * from file(concat(currentDatabase(), '/test4/data.Parquet.gz'), 'Parquet', 'number UInt64');
select * from file(concat(currentDatabase(), '/test4/data.1.Parquet.gz'), 'Parquet', 'number UInt64');

View File

@ -1,5 +1,5 @@
-- Tags: no-fasttest
insert into table function file('data.avro', 'Avro', 'x UInt64') select * from numbers(10);
insert into table function file('data.avro', 'Avro', 'x UInt64') select * from numbers(10);
insert into table function file('data.avro', 'Avro', 'x UInt64') select * from numbers(10);
-- Tags: no-fasttest, no-parallel
insert into table function file('data.avro', 'Parquet', 'x UInt64') select * from numbers(10);
insert into table function file('data.avro', 'Parquet', 'x UInt64') select * from numbers(10); -- { serverError CANNOT_APPEND_TO_FILE }
insert into table function file('data.avro', 'Parquet', 'x UInt64') select * from numbers(10); -- { serverError CANNOT_APPEND_TO_FILE }
select 'OK';

View File

@ -27,7 +27,6 @@ issue_17653 = "https://github.com/ClickHouse/ClickHouse/issues/17653"
issue_17655 = "https://github.com/ClickHouse/ClickHouse/issues/17655"
issue_17766 = "https://github.com/ClickHouse/ClickHouse/issues/17766"
issue_18110 = "https://github.com/ClickHouse/ClickHouse/issues/18110"
issue_18206 = "https://github.com/ClickHouse/ClickHouse/issues/18206"
issue_21083 = "https://github.com/ClickHouse/ClickHouse/issues/21083"
issue_21084 = "https://github.com/ClickHouse/ClickHouse/issues/21084"
issue_25413 = "https://github.com/ClickHouse/ClickHouse/issues/25413"
@ -122,20 +121,6 @@ xfails = {
[(Fail, issue_17655)],
"privileges/public tables/sensitive tables":
[(Fail, issue_18110)],
"privileges/system merges/:/:/:/:/SYSTEM:":
[(Fail, issue_18206)],
"privileges/system ttl merges/:/:/:/:/SYSTEM:":
[(Fail, issue_18206)],
"privileges/system moves/:/:/:/:/SYSTEM:":
[(Fail, issue_18206)],
"privileges/system sends/:/:/:/:/SYSTEM:":
[(Fail, issue_18206)],
"privileges/system fetches/:/:/:/:/SYSTEM:":
[(Fail, issue_18206)],
"privileges/system restart replica/:/:/:/:/SYSTEM:":
[(Fail, issue_18206)],
"privileges/system replication queues/:/:/:/:/SYSTEM:":
[(Fail, issue_18206)],
"privileges/: row policy/nested live:":
[(Fail, issue_21083)],
"privileges/: row policy/nested mat:":

View File

@ -184,7 +184,9 @@ tables_with_database_column=(
tests_with_database_column=( $(
find $ROOT_PATH/tests/queries -iname '*.sql' -or -iname '*.sh' -or -iname '*.py' -or -iname '*.j2' |
grep -vP $EXCLUDE_DIRS |
xargs grep --with-filename $(printf -- "-e %s " "${tables_with_database_column[@]}") | cut -d: -f1 | sort -u
xargs grep --with-filename $(printf -- "-e %s " "${tables_with_database_column[@]}") |
grep -v -e ':--' -e ':#' |
cut -d: -f1 | sort -u
) )
for test_case in "${tests_with_database_column[@]}"; do
grep -qE database.*currentDatabase "$test_case" || {