mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-19 22:22:00 +00:00
Merge branch 'master' into minor-cgroup-improvements
This commit is contained in:
commit
38db3313be
@ -22,11 +22,10 @@ curl https://clickhouse.com/ | sh
|
||||
|
||||
## Upcoming Events
|
||||
|
||||
* [**v23.5 Release Webinar**](https://clickhouse.com/company/events/v23-5-release-webinar?utm_source=github&utm_medium=social&utm_campaign=release-webinar-2023-05) - May 31 - 23.5 is rapidly approaching. Original creator, co-founder, and CTO of ClickHouse Alexey Milovidov will walk us through the highlights of the release.
|
||||
* [**ClickHouse Meetup in Barcelona**](https://www.meetup.com/clickhouse-barcelona-user-group/events/292892669) - May 25
|
||||
* [**ClickHouse Meetup in London**](https://www.meetup.com/clickhouse-london-user-group/events/292892824) - May 25
|
||||
* [**v23.5 Release Webinar**](https://clickhouse.com/company/events/v23-5-release-webinar?utm_source=github&utm_medium=social&utm_campaign=release-webinar-2023-05) - Jun 8 - 23.5 is rapidly approaching. Original creator, co-founder, and CTO of ClickHouse Alexey Milovidov will walk us through the highlights of the release.
|
||||
* [**ClickHouse Meetup in Bangalore**](https://www.meetup.com/clickhouse-bangalore-user-group/events/293740066/) - Jun 7
|
||||
* [**ClickHouse Meetup in San Francisco**](https://www.meetup.com/clickhouse-silicon-valley-meetup-group/events/293426725/) - Jun 7
|
||||
* [**ClickHouse Meetup in Stockholm**](https://www.meetup.com/clickhouse-berlin-user-group/events/292892466) - Jun 13
|
||||
|
||||
|
||||
Also, keep an eye out for upcoming meetups in Amsterdam, Boston, NYC, Beijing, and Toronto. Somewhere else you want us to be? Please feel free to reach out to tyler <at> clickhouse <dot> com.
|
||||
|
||||
|
@ -1205,6 +1205,56 @@ private:
|
||||
|
||||
static std::string rewriteAggregateFunctionNameIfNeeded(const std::string & aggregate_function_name, const ContextPtr & context);
|
||||
|
||||
static std::optional<JoinTableSide> getColumnSideFromJoinTree(const QueryTreeNodePtr & resolved_identifier, const JoinNode & join_node)
|
||||
{
|
||||
if (resolved_identifier->getNodeType() == QueryTreeNodeType::CONSTANT)
|
||||
return {};
|
||||
|
||||
if (resolved_identifier->getNodeType() == QueryTreeNodeType::FUNCTION)
|
||||
{
|
||||
const auto & resolved_function = resolved_identifier->as<FunctionNode &>();
|
||||
|
||||
const auto & argument_nodes = resolved_function.getArguments().getNodes();
|
||||
|
||||
std::optional<JoinTableSide> result;
|
||||
for (const auto & argument_node : argument_nodes)
|
||||
{
|
||||
auto table_side = getColumnSideFromJoinTree(argument_node, join_node);
|
||||
if (table_side && result && *table_side != *result)
|
||||
{
|
||||
throw Exception(ErrorCodes::AMBIGUOUS_IDENTIFIER,
|
||||
"Ambiguous identifier {}. In scope {}",
|
||||
resolved_identifier->formatASTForErrorMessage(),
|
||||
join_node.formatASTForErrorMessage());
|
||||
}
|
||||
if (table_side)
|
||||
result = *table_side;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
const auto * column_src = resolved_identifier->as<ColumnNode &>().getColumnSource().get();
|
||||
|
||||
if (join_node.getLeftTableExpression().get() == column_src)
|
||||
return JoinTableSide::Left;
|
||||
if (join_node.getRightTableExpression().get() == column_src)
|
||||
return JoinTableSide::Right;
|
||||
return {};
|
||||
}
|
||||
|
||||
static void convertJoinedColumnTypeToNullIfNeeded(QueryTreeNodePtr & resolved_identifier, const JoinKind & join_kind, std::optional<JoinTableSide> resolved_side)
|
||||
{
|
||||
if (resolved_identifier->getNodeType() == QueryTreeNodeType::COLUMN &&
|
||||
JoinCommon::canBecomeNullable(resolved_identifier->getResultType()) &&
|
||||
(isFull(join_kind) ||
|
||||
(isLeft(join_kind) && resolved_side && *resolved_side == JoinTableSide::Right) ||
|
||||
(isRight(join_kind) && resolved_side && *resolved_side == JoinTableSide::Left)))
|
||||
{
|
||||
auto & resolved_column = resolved_identifier->as<ColumnNode &>();
|
||||
resolved_column.setColumnType(makeNullableOrLowCardinalityNullable(resolved_column.getColumnType()));
|
||||
}
|
||||
}
|
||||
|
||||
/// Resolve identifier functions
|
||||
|
||||
static QueryTreeNodePtr tryResolveTableIdentifierFromDatabaseCatalog(const Identifier & table_identifier, ContextPtr context);
|
||||
@ -2982,6 +3032,7 @@ QueryTreeNodePtr QueryAnalyzer::tryResolveIdentifierFromJoin(const IdentifierLoo
|
||||
QueryTreeNodePtr resolved_identifier;
|
||||
|
||||
JoinKind join_kind = from_join_node.getKind();
|
||||
bool join_use_nulls = scope.context->getSettingsRef().join_use_nulls;
|
||||
|
||||
if (left_resolved_identifier && right_resolved_identifier)
|
||||
{
|
||||
@ -3027,19 +3078,31 @@ QueryTreeNodePtr QueryAnalyzer::tryResolveIdentifierFromJoin(const IdentifierLoo
|
||||
*
|
||||
* Otherwise we prefer column from left table.
|
||||
*/
|
||||
if (identifier_path_part == right_column_source_alias)
|
||||
return right_resolved_identifier;
|
||||
else if (!left_column_source_alias.empty() &&
|
||||
right_column_source_alias.empty() &&
|
||||
identifier_path_part != left_column_source_alias)
|
||||
return right_resolved_identifier;
|
||||
bool column_resolved_using_right_alias = identifier_path_part == right_column_source_alias;
|
||||
bool column_resolved_without_using_left_alias = !left_column_source_alias.empty()
|
||||
&& right_column_source_alias.empty()
|
||||
&& identifier_path_part != left_column_source_alias;
|
||||
if (column_resolved_using_right_alias || column_resolved_without_using_left_alias)
|
||||
{
|
||||
resolved_side = JoinTableSide::Right;
|
||||
resolved_identifier = right_resolved_identifier;
|
||||
}
|
||||
else
|
||||
{
|
||||
resolved_side = JoinTableSide::Left;
|
||||
resolved_identifier = left_resolved_identifier;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
resolved_side = JoinTableSide::Left;
|
||||
resolved_identifier = left_resolved_identifier;
|
||||
}
|
||||
|
||||
return left_resolved_identifier;
|
||||
}
|
||||
else if (scope.joins_count == 1 && scope.context->getSettingsRef().single_join_prefer_left_table)
|
||||
{
|
||||
return left_resolved_identifier;
|
||||
resolved_side = JoinTableSide::Left;
|
||||
resolved_identifier = left_resolved_identifier;
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -3092,17 +3155,10 @@ QueryTreeNodePtr QueryAnalyzer::tryResolveIdentifierFromJoin(const IdentifierLoo
|
||||
if (join_node_in_resolve_process || !resolved_identifier)
|
||||
return resolved_identifier;
|
||||
|
||||
bool join_use_nulls = scope.context->getSettingsRef().join_use_nulls;
|
||||
|
||||
if (join_use_nulls &&
|
||||
resolved_identifier->getNodeType() == QueryTreeNodeType::COLUMN &&
|
||||
(isFull(join_kind) ||
|
||||
(isLeft(join_kind) && resolved_side && *resolved_side == JoinTableSide::Right) ||
|
||||
(isRight(join_kind) && resolved_side && *resolved_side == JoinTableSide::Left)))
|
||||
if (join_use_nulls)
|
||||
{
|
||||
resolved_identifier = resolved_identifier->clone();
|
||||
auto & resolved_column = resolved_identifier->as<ColumnNode &>();
|
||||
resolved_column.setColumnType(makeNullableOrLowCardinalityNullable(resolved_column.getColumnType()));
|
||||
convertJoinedColumnTypeToNullIfNeeded(resolved_identifier, join_kind, resolved_side);
|
||||
}
|
||||
|
||||
return resolved_identifier;
|
||||
@ -4001,6 +4057,27 @@ ProjectionNames QueryAnalyzer::resolveMatcher(QueryTreeNodePtr & matcher_node, I
|
||||
else
|
||||
matched_expression_nodes_with_names = resolveUnqualifiedMatcher(matcher_node, scope);
|
||||
|
||||
if (scope.context->getSettingsRef().join_use_nulls)
|
||||
{
|
||||
/** If we are resolving matcher came from the result of JOIN and `join_use_nulls` is set,
|
||||
* we need to convert joined column type to Nullable.
|
||||
* We are taking the nearest JoinNode to check to which table column belongs,
|
||||
* because for LEFT/RIGHT join, we convert only the corresponding side.
|
||||
*/
|
||||
const auto * nearest_query_scope = scope.getNearestQueryScope();
|
||||
const QueryNode * nearest_scope_query_node = nearest_query_scope ? nearest_query_scope->scope_node->as<QueryNode>() : nullptr;
|
||||
const QueryTreeNodePtr & nearest_scope_join_tree = nearest_scope_query_node ? nearest_scope_query_node->getJoinTree() : nullptr;
|
||||
const JoinNode * nearest_scope_join_node = nearest_scope_join_tree ? nearest_scope_join_tree->as<JoinNode>() : nullptr;
|
||||
if (nearest_scope_join_node)
|
||||
{
|
||||
for (auto & [node, node_name] : matched_expression_nodes_with_names)
|
||||
{
|
||||
auto join_identifier_side = getColumnSideFromJoinTree(node, *nearest_scope_join_node);
|
||||
convertJoinedColumnTypeToNullIfNeeded(node, nearest_scope_join_node->getKind(), join_identifier_side);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
std::unordered_map<const IColumnTransformerNode *, std::unordered_set<std::string>> strict_transformer_to_used_column_names;
|
||||
for (const auto & transformer : matcher_node_typed.getColumnTransformers().getNodes())
|
||||
{
|
||||
|
@ -29,10 +29,15 @@ protected:
|
||||
/// Make encrypted disk.
|
||||
auto settings = std::make_unique<DiskEncryptedSettings>();
|
||||
settings->wrapped_disk = local_disk;
|
||||
settings->current_algorithm = FileEncryption::Algorithm::AES_128_CTR;
|
||||
settings->keys[0] = "1234567890123456";
|
||||
settings->current_key_id = 0;
|
||||
settings->disk_path = "encrypted/";
|
||||
|
||||
settings->current_algorithm = FileEncryption::Algorithm::AES_128_CTR;
|
||||
String key = "1234567890123456";
|
||||
UInt128 fingerprint = FileEncryption::calculateKeyFingerprint(key);
|
||||
settings->all_keys[fingerprint] = key;
|
||||
settings->current_key = key;
|
||||
settings->current_key_fingerprint = fingerprint;
|
||||
|
||||
encrypted_disk = std::make_shared<DiskEncrypted>("encrypted_disk", std::move(settings), true);
|
||||
}
|
||||
|
||||
|
@ -232,12 +232,28 @@ void Connection::disconnect()
|
||||
maybe_compressed_out = nullptr;
|
||||
in = nullptr;
|
||||
last_input_packet_type.reset();
|
||||
out = nullptr; // can write to socket
|
||||
std::exception_ptr finalize_exception;
|
||||
try
|
||||
{
|
||||
// finalize() can write to socket and throw an exception.
|
||||
if (out)
|
||||
out->finalize();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
/// Don't throw an exception here, it will leave Connection in invalid state.
|
||||
finalize_exception = std::current_exception();
|
||||
}
|
||||
out = nullptr;
|
||||
|
||||
if (socket)
|
||||
socket->close();
|
||||
socket = nullptr;
|
||||
connected = false;
|
||||
nonce.reset();
|
||||
|
||||
if (finalize_exception)
|
||||
std::rethrow_exception(finalize_exception);
|
||||
}
|
||||
|
||||
|
||||
|
@ -138,7 +138,7 @@ void FileChecker::save() const
|
||||
std::string tmp_files_info_path = parentPath(files_info_path) + "tmp_" + fileName(files_info_path);
|
||||
|
||||
{
|
||||
std::unique_ptr<WriteBuffer> out = disk ? disk->writeFile(tmp_files_info_path) : std::make_unique<WriteBufferFromFile>(tmp_files_info_path);
|
||||
std::unique_ptr<WriteBufferFromFileBase> out = disk ? disk->writeFile(tmp_files_info_path) : std::make_unique<WriteBufferFromFile>(tmp_files_info_path);
|
||||
|
||||
/// So complex JSON structure - for compatibility with the old format.
|
||||
writeCString("{\"clickhouse\":{", *out);
|
||||
@ -157,7 +157,9 @@ void FileChecker::save() const
|
||||
}
|
||||
|
||||
writeCString("}}", *out);
|
||||
out->next();
|
||||
|
||||
out->sync();
|
||||
out->finalize();
|
||||
}
|
||||
|
||||
if (disk)
|
||||
|
@ -19,7 +19,6 @@ namespace ErrorCodes
|
||||
{
|
||||
extern const int BAD_ARGUMENTS;
|
||||
extern const int INCORRECT_DISK_INDEX;
|
||||
extern const int DATA_ENCRYPTION_ERROR;
|
||||
extern const int NOT_IMPLEMENTED;
|
||||
}
|
||||
|
||||
@ -42,87 +41,201 @@ namespace
|
||||
}
|
||||
}
|
||||
|
||||
/// Reads encryption keys from the configuration.
|
||||
void getKeysFromConfig(const Poco::Util::AbstractConfiguration & config, const String & config_prefix,
|
||||
std::map<UInt64, String> & out_keys_by_id, Strings & out_keys_without_id)
|
||||
{
|
||||
Strings config_keys;
|
||||
config.keys(config_prefix, config_keys);
|
||||
|
||||
for (const std::string & config_key : config_keys)
|
||||
{
|
||||
String key;
|
||||
std::optional<UInt64> key_id;
|
||||
|
||||
if ((config_key == "key") || config_key.starts_with("key["))
|
||||
{
|
||||
String key_path = config_prefix + "." + config_key;
|
||||
key = config.getString(key_path);
|
||||
String key_id_path = key_path + "[@id]";
|
||||
if (config.has(key_id_path))
|
||||
key_id = config.getUInt64(key_id_path);
|
||||
}
|
||||
else if ((config_key == "key_hex") || config_key.starts_with("key_hex["))
|
||||
{
|
||||
String key_path = config_prefix + "." + config_key;
|
||||
key = unhexKey(config.getString(key_path));
|
||||
String key_id_path = key_path + "[@id]";
|
||||
if (config.has(key_id_path))
|
||||
key_id = config.getUInt64(key_id_path);
|
||||
}
|
||||
else
|
||||
continue;
|
||||
|
||||
if (key_id)
|
||||
{
|
||||
if (!out_keys_by_id.contains(*key_id))
|
||||
out_keys_by_id[*key_id] = key;
|
||||
else
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Multiple keys specified for same ID {}", *key_id);
|
||||
}
|
||||
else
|
||||
out_keys_without_id.push_back(key);
|
||||
}
|
||||
|
||||
if (out_keys_by_id.empty() && out_keys_without_id.empty())
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "No encryption keys found");
|
||||
|
||||
if (out_keys_by_id.empty() && (out_keys_without_id.size() == 1))
|
||||
{
|
||||
out_keys_by_id[0] = out_keys_without_id.front();
|
||||
out_keys_without_id.clear();
|
||||
}
|
||||
}
|
||||
|
||||
/// Reads the current encryption key from the configuration.
|
||||
String getCurrentKeyFromConfig(const Poco::Util::AbstractConfiguration & config, const String & config_prefix,
|
||||
const std::map<UInt64, String> & keys_by_id, const Strings & keys_without_id)
|
||||
{
|
||||
String key_path = config_prefix + ".current_key";
|
||||
String key_hex_path = config_prefix + ".current_key_hex";
|
||||
String key_id_path = config_prefix + ".current_key_id";
|
||||
|
||||
if (config.has(key_path) + config.has(key_hex_path) + config.has(key_id_path) > 1)
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "The current key is specified multiple times");
|
||||
|
||||
auto check_current_key_found = [&](const String & current_key_)
|
||||
{
|
||||
for (const auto & [_, key] : keys_by_id)
|
||||
{
|
||||
if (key == current_key_)
|
||||
return;
|
||||
}
|
||||
for (const auto & key : keys_without_id)
|
||||
{
|
||||
if (key == current_key_)
|
||||
return;
|
||||
}
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "The current key is not found in keys");
|
||||
};
|
||||
|
||||
if (config.has(key_path))
|
||||
{
|
||||
String current_key = config.getString(key_path);
|
||||
check_current_key_found(current_key);
|
||||
return current_key;
|
||||
}
|
||||
else if (config.has(key_hex_path))
|
||||
{
|
||||
String current_key = unhexKey(config.getString(key_hex_path));
|
||||
check_current_key_found(current_key);
|
||||
return current_key;
|
||||
}
|
||||
else if (config.has(key_id_path))
|
||||
{
|
||||
UInt64 current_key_id = config.getUInt64(key_id_path);
|
||||
auto it = keys_by_id.find(current_key_id);
|
||||
if (it == keys_by_id.end())
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Not found a key with the current ID {}", current_key_id);
|
||||
return it->second;
|
||||
}
|
||||
else if (keys_by_id.size() == 1 && keys_without_id.empty() && keys_by_id.begin()->first == 0)
|
||||
{
|
||||
/// There is only a single key defined with id=0, so we can choose it as current.
|
||||
return keys_by_id.begin()->second;
|
||||
}
|
||||
else
|
||||
{
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "The current key is not specified");
|
||||
}
|
||||
}
|
||||
|
||||
/// Reads the current encryption algorithm from the configuration.
|
||||
Algorithm getCurrentAlgorithmFromConfig(const Poco::Util::AbstractConfiguration & config, const String & config_prefix)
|
||||
{
|
||||
String path = config_prefix + ".algorithm";
|
||||
if (!config.has(path))
|
||||
return DEFAULT_ENCRYPTION_ALGORITHM;
|
||||
return parseAlgorithmFromString(config.getString(path));
|
||||
}
|
||||
|
||||
/// Reads the name of a wrapped disk & the path on the wrapped disk and then finds that disk in a disk map.
|
||||
void getDiskAndPathFromConfig(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, const DisksMap & map,
|
||||
DiskPtr & out_disk, String & out_path)
|
||||
{
|
||||
String disk_name = config.getString(config_prefix + ".disk", "");
|
||||
if (disk_name.empty())
|
||||
throw Exception(
|
||||
ErrorCodes::BAD_ARGUMENTS, "Name of the wrapped disk must not be empty. Encrypted disk is a wrapper over another disk");
|
||||
|
||||
auto disk_it = map.find(disk_name);
|
||||
if (disk_it == map.end())
|
||||
throw Exception(
|
||||
ErrorCodes::BAD_ARGUMENTS, "The wrapped disk must have been announced earlier. No disk with name {}", disk_name);
|
||||
|
||||
out_disk = disk_it->second;
|
||||
|
||||
out_path = config.getString(config_prefix + ".path", "");
|
||||
if (!out_path.empty() && (out_path.back() != '/'))
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Disk path must ends with '/', but '{}' doesn't.", quoteString(out_path));
|
||||
}
|
||||
|
||||
/// Parses the settings of an ecnrypted disk from the configuration.
|
||||
std::unique_ptr<const DiskEncryptedSettings> parseDiskEncryptedSettings(
|
||||
const String & name, const Poco::Util::AbstractConfiguration & config, const String & config_prefix, const DisksMap & map)
|
||||
const String & disk_name,
|
||||
const Poco::Util::AbstractConfiguration & config,
|
||||
const String & config_prefix,
|
||||
const DisksMap & disk_map)
|
||||
{
|
||||
try
|
||||
{
|
||||
auto res = std::make_unique<DiskEncryptedSettings>();
|
||||
res->current_algorithm = DEFAULT_ENCRYPTION_ALGORITHM;
|
||||
if (config.has(config_prefix + ".algorithm"))
|
||||
parseFromString(res->current_algorithm, config.getString(config_prefix + ".algorithm"));
|
||||
|
||||
Strings config_keys;
|
||||
config.keys(config_prefix, config_keys);
|
||||
for (const std::string & config_key : config_keys)
|
||||
std::map<UInt64, String> keys_by_id;
|
||||
Strings keys_without_id;
|
||||
getKeysFromConfig(config, config_prefix, keys_by_id, keys_without_id);
|
||||
|
||||
for (const auto & [key_id, key] : keys_by_id)
|
||||
{
|
||||
String key;
|
||||
UInt64 key_id;
|
||||
auto fingerprint = calculateKeyFingerprint(key);
|
||||
res->all_keys[fingerprint] = key;
|
||||
|
||||
if ((config_key == "key") || config_key.starts_with("key["))
|
||||
{
|
||||
key = config.getString(config_prefix + "." + config_key, "");
|
||||
key_id = config.getUInt64(config_prefix + "." + config_key + "[@id]", 0);
|
||||
}
|
||||
else if ((config_key == "key_hex") || config_key.starts_with("key_hex["))
|
||||
{
|
||||
key = unhexKey(config.getString(config_prefix + "." + config_key, ""));
|
||||
key_id = config.getUInt64(config_prefix + "." + config_key + "[@id]", 0);
|
||||
}
|
||||
else
|
||||
continue;
|
||||
|
||||
if (res->keys.contains(key_id))
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Multiple keys have the same ID {}", key_id);
|
||||
res->keys[key_id] = key;
|
||||
/// Version 1 used key fingerprints based on the key id.
|
||||
/// We have to add such fingerprints to the map too to support reading files encrypted by version 1.
|
||||
auto v1_fingerprint = calculateV1KeyFingerprint(key, key_id);
|
||||
res->all_keys[v1_fingerprint] = key;
|
||||
}
|
||||
|
||||
if (res->keys.empty())
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "No keys, an encrypted disk needs keys to work");
|
||||
|
||||
if (!config.has(config_prefix + ".current_key_id"))
|
||||
for (const auto & key : keys_without_id)
|
||||
{
|
||||
/// In case of multiple keys, current_key_id is mandatory
|
||||
if (res->keys.size() > 1)
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "There are multiple keys in config. current_key_id is required");
|
||||
|
||||
/// If there is only one key with non zero ID, curren_key_id should be defined.
|
||||
if (res->keys.size() == 1 && !res->keys.contains(0))
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Config has one key with non zero id. сurrent_key_id is required");
|
||||
auto fingerprint = calculateKeyFingerprint(key);
|
||||
res->all_keys[fingerprint] = key;
|
||||
}
|
||||
|
||||
res->current_key_id = config.getUInt64(config_prefix + ".current_key_id", 0);
|
||||
if (!res->keys.contains(res->current_key_id))
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Not found a key with the current ID {}", res->current_key_id);
|
||||
FileEncryption::checkKeySize(res->current_algorithm, res->keys[res->current_key_id].size());
|
||||
String current_key = getCurrentKeyFromConfig(config, config_prefix, keys_by_id, keys_without_id);
|
||||
res->current_key = current_key;
|
||||
res->current_key_fingerprint = calculateKeyFingerprint(current_key);
|
||||
|
||||
String wrapped_disk_name = config.getString(config_prefix + ".disk", "");
|
||||
if (wrapped_disk_name.empty())
|
||||
throw Exception(
|
||||
ErrorCodes::BAD_ARGUMENTS,
|
||||
"Name of the wrapped disk must not be empty. Encrypted disk is a wrapper over another disk");
|
||||
res->current_algorithm = getCurrentAlgorithmFromConfig(config, config_prefix);
|
||||
|
||||
auto wrapped_disk_it = map.find(wrapped_disk_name);
|
||||
if (wrapped_disk_it == map.end())
|
||||
throw Exception(
|
||||
ErrorCodes::BAD_ARGUMENTS,
|
||||
"The wrapped disk must have been announced earlier. No disk with name {}",
|
||||
wrapped_disk_name);
|
||||
res->wrapped_disk = wrapped_disk_it->second;
|
||||
FileEncryption::checkKeySize(res->current_key.size(), res->current_algorithm);
|
||||
|
||||
res->disk_path = config.getString(config_prefix + ".path", "");
|
||||
if (!res->disk_path.empty() && (res->disk_path.back() != '/'))
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Disk path must ends with '/', but '{}' doesn't.", quoteString(res->disk_path));
|
||||
DiskPtr wrapped_disk;
|
||||
String disk_path;
|
||||
getDiskAndPathFromConfig(config, config_prefix, disk_map, wrapped_disk, disk_path);
|
||||
res->wrapped_disk = wrapped_disk;
|
||||
res->disk_path = disk_path;
|
||||
|
||||
return res;
|
||||
}
|
||||
catch (Exception & e)
|
||||
{
|
||||
e.addMessage("Disk " + name);
|
||||
e.addMessage("Disk " + disk_name);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
/// Reads the header of an encrypted file.
|
||||
FileEncryption::Header readHeader(ReadBufferFromFileBase & read_buffer)
|
||||
{
|
||||
try
|
||||
@ -138,24 +251,6 @@ namespace
|
||||
}
|
||||
}
|
||||
|
||||
String getKey(const String & path, const FileEncryption::Header & header, const DiskEncryptedSettings & settings)
|
||||
{
|
||||
auto it = settings.keys.find(header.key_id);
|
||||
if (it == settings.keys.end())
|
||||
throw Exception(
|
||||
ErrorCodes::DATA_ENCRYPTION_ERROR,
|
||||
"Not found a key with ID {} required to decipher file {}",
|
||||
header.key_id,
|
||||
quoteString(path));
|
||||
|
||||
String key = it->second;
|
||||
if (calculateKeyHash(key) != header.key_hash)
|
||||
throw Exception(
|
||||
ErrorCodes::DATA_ENCRYPTION_ERROR, "Wrong key with ID {}, could not decipher file {}", header.key_id, quoteString(path));
|
||||
|
||||
return key;
|
||||
}
|
||||
|
||||
bool inline isSameDiskType(const IDisk & one, const IDisk & another)
|
||||
{
|
||||
return typeid(one) == typeid(another);
|
||||
@ -225,7 +320,7 @@ void DiskEncrypted::copy(const String & from_path, const std::shared_ptr<IDisk>
|
||||
{
|
||||
auto from_settings = current_settings.get();
|
||||
auto to_settings = to_disk_enc->current_settings.get();
|
||||
if (from_settings->keys == to_settings->keys)
|
||||
if (from_settings->all_keys == to_settings->all_keys)
|
||||
{
|
||||
/// Keys are the same so we can simply copy the encrypted file.
|
||||
auto wrapped_from_path = wrappedPath(from_path);
|
||||
@ -252,7 +347,7 @@ void DiskEncrypted::copyDirectoryContent(const String & from_dir, const std::sha
|
||||
{
|
||||
auto from_settings = current_settings.get();
|
||||
auto to_settings = to_disk_enc->current_settings.get();
|
||||
if (from_settings->keys == to_settings->keys)
|
||||
if (from_settings->all_keys == to_settings->all_keys)
|
||||
{
|
||||
/// Keys are the same so we can simply copy the encrypted file.
|
||||
auto wrapped_from_path = wrappedPath(from_dir);
|
||||
@ -293,7 +388,7 @@ std::unique_ptr<ReadBufferFromFileBase> DiskEncrypted::readFile(
|
||||
}
|
||||
auto encryption_settings = current_settings.get();
|
||||
FileEncryption::Header header = readHeader(*buffer);
|
||||
String key = getKey(path, header, *encryption_settings);
|
||||
String key = encryption_settings->findKeyByFingerprint(header.key_fingerprint, path);
|
||||
return std::make_unique<ReadBufferFromEncryptedFile>(settings.local_fs_buffer_size, std::move(buffer), key, header);
|
||||
}
|
||||
|
||||
|
@ -38,39 +38,21 @@ FileEncryption::Header readHeader(ReadBufferFromFileBase & read_buffer)
|
||||
}
|
||||
}
|
||||
|
||||
String getCurrentKey(const String & path, const DiskEncryptedSettings & settings)
|
||||
}
|
||||
|
||||
String DiskEncryptedSettings::findKeyByFingerprint(UInt128 key_fingerprint, const String & path_for_logs) const
|
||||
{
|
||||
auto it = settings.keys.find(settings.current_key_id);
|
||||
if (it == settings.keys.end())
|
||||
auto it = all_keys.find(key_fingerprint);
|
||||
if (it == all_keys.end())
|
||||
{
|
||||
throw Exception(
|
||||
ErrorCodes::DATA_ENCRYPTION_ERROR,
|
||||
"Not found a key with the current ID {} required to cipher file {}",
|
||||
settings.current_key_id,
|
||||
quoteString(path));
|
||||
|
||||
"Not found an encryption key required to decipher file {}",
|
||||
quoteString(path_for_logs));
|
||||
}
|
||||
return it->second;
|
||||
}
|
||||
|
||||
String getKey(const String & path, const FileEncryption::Header & header, const DiskEncryptedSettings & settings)
|
||||
{
|
||||
auto it = settings.keys.find(header.key_id);
|
||||
if (it == settings.keys.end())
|
||||
throw Exception(
|
||||
ErrorCodes::DATA_ENCRYPTION_ERROR,
|
||||
"Not found a key with ID {} required to decipher file {}",
|
||||
header.key_id,
|
||||
quoteString(path));
|
||||
|
||||
String key = it->second;
|
||||
if (FileEncryption::calculateKeyHash(key) != header.key_hash)
|
||||
throw Exception(
|
||||
ErrorCodes::DATA_ENCRYPTION_ERROR, "Wrong key with ID {}, could not decipher file {}", header.key_id, quoteString(path));
|
||||
|
||||
return key;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
void DiskEncryptedTransaction::copyFile(const std::string & from_file_path, const std::string & to_file_path)
|
||||
{
|
||||
auto wrapped_from_path = wrappedPath(from_file_path);
|
||||
@ -98,16 +80,15 @@ std::unique_ptr<WriteBufferFromFileBase> DiskEncryptedTransaction::writeFile( //
|
||||
/// Append mode: we continue to use the same header.
|
||||
auto read_buffer = delegate_disk->readFile(wrapped_path, ReadSettings().adjustBufferSize(FileEncryption::Header::kSize));
|
||||
header = readHeader(*read_buffer);
|
||||
key = getKey(path, header, current_settings);
|
||||
key = current_settings.findKeyByFingerprint(header.key_fingerprint, path);
|
||||
}
|
||||
}
|
||||
if (!old_file_size)
|
||||
{
|
||||
/// Rewrite mode: we generate a new header.
|
||||
key = getCurrentKey(path, current_settings);
|
||||
header.algorithm = current_settings.current_algorithm;
|
||||
header.key_id = current_settings.current_key_id;
|
||||
header.key_hash = FileEncryption::calculateKeyHash(key);
|
||||
key = current_settings.current_key;
|
||||
header.key_fingerprint = current_settings.current_key_fingerprint;
|
||||
header.init_vector = FileEncryption::InitVector::random();
|
||||
}
|
||||
auto buffer = delegate_transaction->writeFile(wrapped_path, buf_size, mode, settings, autocommit);
|
||||
|
@ -18,9 +18,13 @@ struct DiskEncryptedSettings
|
||||
{
|
||||
DiskPtr wrapped_disk;
|
||||
String disk_path;
|
||||
std::unordered_map<UInt64, String> keys;
|
||||
UInt64 current_key_id;
|
||||
String current_key;
|
||||
UInt128 current_key_fingerprint;
|
||||
FileEncryption::Algorithm current_algorithm;
|
||||
std::unordered_map<UInt128 /* fingerprint */, String /* key */> all_keys;
|
||||
|
||||
/// Returns an encryption key found by its fingerprint.
|
||||
String findKeyByFingerprint(UInt128 key_fingerprint, const String & path_for_logs) const;
|
||||
};
|
||||
|
||||
|
||||
|
@ -13,19 +13,6 @@ WriteBufferWithFinalizeCallback::WriteBufferWithFinalizeCallback(
|
||||
{
|
||||
}
|
||||
|
||||
|
||||
WriteBufferWithFinalizeCallback::~WriteBufferWithFinalizeCallback()
|
||||
{
|
||||
try
|
||||
{
|
||||
finalize();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||
}
|
||||
}
|
||||
|
||||
void WriteBufferWithFinalizeCallback::finalizeImpl()
|
||||
{
|
||||
WriteBufferFromFileDecorator::finalizeImpl();
|
||||
|
@ -19,8 +19,6 @@ public:
|
||||
FinalizeCallback && create_callback_,
|
||||
const String & remote_path_);
|
||||
|
||||
~WriteBufferWithFinalizeCallback() override;
|
||||
|
||||
String getFileName() const override { return remote_path; }
|
||||
|
||||
private:
|
||||
|
@ -37,8 +37,10 @@ protected:
|
||||
auto settings = std::make_unique<DiskEncryptedSettings>();
|
||||
settings->wrapped_disk = local_disk;
|
||||
settings->current_algorithm = algorithm;
|
||||
settings->keys[0] = key;
|
||||
settings->current_key_id = 0;
|
||||
auto fingerprint = FileEncryption::calculateKeyFingerprint(key);
|
||||
settings->all_keys[fingerprint] = key;
|
||||
settings->current_key = key;
|
||||
settings->current_key_fingerprint = fingerprint;
|
||||
settings->disk_path = path;
|
||||
encrypted_disk = std::make_shared<DiskEncrypted>("encrypted_disk", std::move(settings), true);
|
||||
}
|
||||
@ -255,7 +257,7 @@ TEST_F(DiskEncryptedTest, RandomIV)
|
||||
|
||||
String bina = getBinaryRepresentation(getDirectory() + "a.txt");
|
||||
String binb = getBinaryRepresentation(getDirectory() + "b.txt");
|
||||
constexpr size_t iv_offset = 16;
|
||||
constexpr size_t iv_offset = 23; /// See the description of the format in the comment for FileEncryption::Header.
|
||||
constexpr size_t iv_size = FileEncryption::InitVector::kSize;
|
||||
EXPECT_EQ(bina.substr(0, iv_offset), binb.substr(0, iv_offset)); /// Part of the header before IV is the same.
|
||||
EXPECT_NE(bina.substr(iv_offset, iv_size), binb.substr(iv_offset, iv_size)); /// IV differs.
|
||||
|
@ -1,13 +1,13 @@
|
||||
#include <DataTypes/DataTypeNullable.h>
|
||||
#include <DataTypes/DataTypeArray.h>
|
||||
#include <DataTypes/DataTypeTuple.h>
|
||||
#include <DataTypes/DataTypeMap.h>
|
||||
#include <DataTypes/DataTypeLowCardinality.h>
|
||||
#include <DataTypes/DataTypeMap.h>
|
||||
#include <DataTypes/DataTypeNullable.h>
|
||||
#include <DataTypes/DataTypeTuple.h>
|
||||
#include <Formats/ReadSchemaUtils.h>
|
||||
#include <Processors/Formats/ISchemaReader.h>
|
||||
#include <Common/assert_cast.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Processors/Formats/ISchemaReader.h>
|
||||
#include <Storages/IStorage.h>
|
||||
#include <Common/assert_cast.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -20,8 +20,7 @@ namespace ErrorCodes
|
||||
extern const int CANNOT_EXTRACT_TABLE_STRUCTURE;
|
||||
}
|
||||
|
||||
static std::optional<NamesAndTypesList> getOrderedColumnsList(
|
||||
const NamesAndTypesList & columns_list, const Names & columns_order_hint)
|
||||
static std::optional<NamesAndTypesList> getOrderedColumnsList(const NamesAndTypesList & columns_list, const Names & columns_order_hint)
|
||||
{
|
||||
if (columns_list.size() != columns_order_hint.size())
|
||||
return {};
|
||||
@ -65,7 +64,8 @@ ColumnsDescription readSchemaFromFormat(
|
||||
}
|
||||
catch (Exception & e)
|
||||
{
|
||||
e.addMessage(fmt::format("Cannot extract table structure from {} format file. You can specify the structure manually", format_name));
|
||||
e.addMessage(
|
||||
fmt::format("Cannot extract table structure from {} format file. You can specify the structure manually", format_name));
|
||||
throw;
|
||||
}
|
||||
}
|
||||
@ -73,7 +73,8 @@ ColumnsDescription readSchemaFromFormat(
|
||||
{
|
||||
std::string exception_messages;
|
||||
SchemaReaderPtr schema_reader;
|
||||
size_t max_rows_to_read = format_settings ? format_settings->max_rows_to_read_for_schema_inference : context->getSettingsRef().input_format_max_rows_to_read_for_schema_inference;
|
||||
size_t max_rows_to_read = format_settings ? format_settings->max_rows_to_read_for_schema_inference
|
||||
: context->getSettingsRef().input_format_max_rows_to_read_for_schema_inference;
|
||||
size_t iterations = 0;
|
||||
ColumnsDescription cached_columns;
|
||||
while (true)
|
||||
@ -88,8 +89,8 @@ ColumnsDescription readSchemaFromFormat(
|
||||
}
|
||||
catch (Exception & e)
|
||||
{
|
||||
e.addMessage(fmt::format(
|
||||
"Cannot extract table structure from {} format file. You can specify the structure manually", format_name));
|
||||
e.addMessage(
|
||||
fmt::format("Cannot extract table structure from {} format file. You can specify the structure manually", format_name));
|
||||
throw;
|
||||
}
|
||||
catch (...)
|
||||
@ -109,7 +110,8 @@ ColumnsDescription readSchemaFromFormat(
|
||||
auto exception_message = fmt::format("Cannot extract table structure from {} format file, file is empty", format_name);
|
||||
|
||||
if (!retry)
|
||||
throw Exception(ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, "{}. You can specify the structure manually", exception_message);
|
||||
throw Exception(
|
||||
ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, "{}. You can specify the structure manually", exception_message);
|
||||
|
||||
exception_messages += "\n" + exception_message;
|
||||
continue;
|
||||
@ -132,7 +134,8 @@ ColumnsDescription readSchemaFromFormat(
|
||||
max_rows_to_read -= schema_reader->getNumRowsRead();
|
||||
if (rows_read != 0 && max_rows_to_read == 0)
|
||||
{
|
||||
exception_message += "\nTo increase the maximum number of rows to read for structure determination, use setting input_format_max_rows_to_read_for_schema_inference";
|
||||
exception_message += "\nTo increase the maximum number of rows to read for structure determination, use setting "
|
||||
"input_format_max_rows_to_read_for_schema_inference";
|
||||
if (iterations > 1)
|
||||
{
|
||||
exception_messages += "\n" + exception_message;
|
||||
@ -150,15 +153,18 @@ ColumnsDescription readSchemaFromFormat(
|
||||
}
|
||||
catch (Exception & e)
|
||||
{
|
||||
e.addMessage(fmt::format("Cannot extract table structure from {} format file. You can specify the structure manually", format_name));
|
||||
e.addMessage(fmt::format(
|
||||
"Cannot extract table structure from {} format file. You can specify the structure manually", format_name));
|
||||
throw;
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
throw Exception(ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE,
|
||||
"Cannot extract table structure from {} format file. "
|
||||
"Error: {}. You can specify the structure manually",
|
||||
format_name, exception_message);
|
||||
throw Exception(
|
||||
ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE,
|
||||
"Cannot extract table structure from {} format file. "
|
||||
"Error: {}. You can specify the structure manually",
|
||||
format_name,
|
||||
exception_message);
|
||||
}
|
||||
}
|
||||
|
||||
@ -170,9 +176,11 @@ ColumnsDescription readSchemaFromFormat(
|
||||
return cached_columns;
|
||||
|
||||
if (names_and_types.empty())
|
||||
throw Exception(ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE,
|
||||
"All attempts to extract table structure from files failed. "
|
||||
"Errors:{}\nYou can specify the structure manually", exception_messages);
|
||||
throw Exception(
|
||||
ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE,
|
||||
"All attempts to extract table structure from files failed. "
|
||||
"Errors:{}\nYou can specify the structure manually",
|
||||
exception_messages);
|
||||
|
||||
/// If we have "INSERT SELECT" query then try to order
|
||||
/// columns as they are ordered in table schema for formats
|
||||
@ -191,20 +199,30 @@ ColumnsDescription readSchemaFromFormat(
|
||||
}
|
||||
}
|
||||
else
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS,
|
||||
"{} file format doesn't support schema inference. You must specify the structure manually",
|
||||
format_name);
|
||||
|
||||
throw Exception(
|
||||
ErrorCodes::BAD_ARGUMENTS,
|
||||
"{} file format doesn't support schema inference. You must specify the structure manually",
|
||||
format_name);
|
||||
/// Some formats like CSVWithNames can contain empty column names. We don't support empty column names and further processing can fail with an exception. Let's just remove columns with empty names from the structure.
|
||||
names_and_types.erase(
|
||||
std::remove_if(names_and_types.begin(), names_and_types.end(), [](const NameAndTypePair & pair) { return pair.name.empty(); }),
|
||||
names_and_types.end());
|
||||
return ColumnsDescription(names_and_types);
|
||||
}
|
||||
|
||||
ColumnsDescription readSchemaFromFormat(const String & format_name, const std::optional<FormatSettings> & format_settings, ReadBufferIterator & read_buffer_iterator, bool retry, ContextPtr & context)
|
||||
ColumnsDescription readSchemaFromFormat(
|
||||
const String & format_name,
|
||||
const std::optional<FormatSettings> & format_settings,
|
||||
ReadBufferIterator & read_buffer_iterator,
|
||||
bool retry,
|
||||
ContextPtr & context)
|
||||
{
|
||||
std::unique_ptr<ReadBuffer> buf_out;
|
||||
return readSchemaFromFormat(format_name, format_settings, read_buffer_iterator, retry, context, buf_out);
|
||||
}
|
||||
|
||||
SchemaCache::Key getKeyForSchemaCache(const String & source, const String & format, const std::optional<FormatSettings> & format_settings, const ContextPtr & context)
|
||||
SchemaCache::Key getKeyForSchemaCache(
|
||||
const String & source, const String & format, const std::optional<FormatSettings> & format_settings, const ContextPtr & context)
|
||||
{
|
||||
return getKeysForSchemaCache({source}, format, format_settings, context).front();
|
||||
}
|
||||
@ -214,7 +232,8 @@ static SchemaCache::Key makeSchemaCacheKey(const String & source, const String &
|
||||
return SchemaCache::Key{source, format, additional_format_info};
|
||||
}
|
||||
|
||||
SchemaCache::Keys getKeysForSchemaCache(const Strings & sources, const String & format, const std::optional<FormatSettings> & format_settings, const ContextPtr & context)
|
||||
SchemaCache::Keys getKeysForSchemaCache(
|
||||
const Strings & sources, const String & format, const std::optional<FormatSettings> & format_settings, const ContextPtr & context)
|
||||
{
|
||||
/// For some formats data schema depends on some settings, so it's possible that
|
||||
/// two queries to the same source will get two different schemas. To process this
|
||||
@ -224,7 +243,11 @@ SchemaCache::Keys getKeysForSchemaCache(const Strings & sources, const String &
|
||||
String additional_format_info = FormatFactory::instance().getAdditionalInfoForSchemaCache(format, context, format_settings);
|
||||
SchemaCache::Keys cache_keys;
|
||||
cache_keys.reserve(sources.size());
|
||||
std::transform(sources.begin(), sources.end(), std::back_inserter(cache_keys), [&](const auto & source){ return makeSchemaCacheKey(source, format, additional_format_info); });
|
||||
std::transform(
|
||||
sources.begin(),
|
||||
sources.end(),
|
||||
std::back_inserter(cache_keys),
|
||||
[&](const auto & source) { return makeSchemaCacheKey(source, format, additional_format_info); });
|
||||
return cache_keys;
|
||||
}
|
||||
|
||||
|
@ -191,7 +191,10 @@ namespace
|
||||
explicit StreamFromWriteBuffer(std::unique_ptr<WriteBuffer> write_buffer_)
|
||||
: write_buffer(std::move(write_buffer_)), start_offset(write_buffer->count()) {}
|
||||
|
||||
~StreamFromWriteBuffer() { write_buffer->finalize(); }
|
||||
~StreamFromWriteBuffer()
|
||||
{
|
||||
write_buffer->finalize();
|
||||
}
|
||||
|
||||
static int closeFileFunc(void *, void * stream)
|
||||
{
|
||||
|
@ -34,6 +34,7 @@ namespace
|
||||
case Algorithm::AES_128_CTR: return EVP_aes_128_ctr();
|
||||
case Algorithm::AES_192_CTR: return EVP_aes_192_ctr();
|
||||
case Algorithm::AES_256_CTR: return EVP_aes_256_ctr();
|
||||
case Algorithm::MAX: break;
|
||||
}
|
||||
throw Exception(
|
||||
ErrorCodes::BAD_ARGUMENTS,
|
||||
@ -187,10 +188,14 @@ namespace
|
||||
return plaintext_size;
|
||||
}
|
||||
|
||||
constexpr const char kHeaderSignature[] = "ENC";
|
||||
constexpr const UInt16 kHeaderCurrentVersion = 1;
|
||||
}
|
||||
constexpr const std::string_view kHeaderSignature = "ENC";
|
||||
|
||||
UInt128 calculateV1KeyFingerprint(UInt8 small_key_hash, UInt64 key_id)
|
||||
{
|
||||
/// In the version 1 we stored {key_id, very_small_hash(key)} instead of a fingerprint.
|
||||
return static_cast<UInt128>(key_id) | (static_cast<UInt128>(small_key_hash) << 64);
|
||||
}
|
||||
}
|
||||
|
||||
String toString(Algorithm algorithm)
|
||||
{
|
||||
@ -199,6 +204,7 @@ String toString(Algorithm algorithm)
|
||||
case Algorithm::AES_128_CTR: return "aes_128_ctr";
|
||||
case Algorithm::AES_192_CTR: return "aes_192_ctr";
|
||||
case Algorithm::AES_256_CTR: return "aes_256_ctr";
|
||||
case Algorithm::MAX: break;
|
||||
}
|
||||
throw Exception(
|
||||
ErrorCodes::BAD_ARGUMENTS,
|
||||
@ -206,14 +212,14 @@ String toString(Algorithm algorithm)
|
||||
static_cast<int>(algorithm));
|
||||
}
|
||||
|
||||
void parseFromString(Algorithm & algorithm, const String & str)
|
||||
Algorithm parseAlgorithmFromString(const String & str)
|
||||
{
|
||||
if (boost::iequals(str, "aes_128_ctr"))
|
||||
algorithm = Algorithm::AES_128_CTR;
|
||||
return Algorithm::AES_128_CTR;
|
||||
else if (boost::iequals(str, "aes_192_ctr"))
|
||||
algorithm = Algorithm::AES_192_CTR;
|
||||
return Algorithm::AES_192_CTR;
|
||||
else if (boost::iequals(str, "aes_256_ctr"))
|
||||
algorithm = Algorithm::AES_256_CTR;
|
||||
return Algorithm::AES_256_CTR;
|
||||
else
|
||||
throw Exception(
|
||||
ErrorCodes::BAD_ARGUMENTS,
|
||||
@ -221,7 +227,7 @@ void parseFromString(Algorithm & algorithm, const String & str)
|
||||
str);
|
||||
}
|
||||
|
||||
void checkKeySize(Algorithm algorithm, size_t key_size) { checkKeySize(getCipher(algorithm), key_size); }
|
||||
void checkKeySize(size_t key_size, Algorithm algorithm) { checkKeySize(getCipher(algorithm), key_size); }
|
||||
|
||||
|
||||
String InitVector::toString() const
|
||||
@ -364,54 +370,92 @@ void Encryptor::decrypt(const char * data, size_t size, char * out)
|
||||
|
||||
void Header::read(ReadBuffer & in)
|
||||
{
|
||||
constexpr size_t header_signature_size = std::size(kHeaderSignature) - 1;
|
||||
char signature[std::size(kHeaderSignature)] = {};
|
||||
in.readStrict(signature, header_signature_size);
|
||||
if (strcmp(signature, kHeaderSignature) != 0)
|
||||
char signature[kHeaderSignature.length()];
|
||||
in.readStrict(signature, kHeaderSignature.length());
|
||||
if (memcmp(signature, kHeaderSignature.data(), kHeaderSignature.length()) != 0)
|
||||
throw Exception(ErrorCodes::DATA_ENCRYPTION_ERROR, "Wrong signature, this is not an encrypted file");
|
||||
|
||||
UInt16 version;
|
||||
readPODBinary(version, in);
|
||||
if (version != kHeaderCurrentVersion)
|
||||
/// The endianness of how the header is written.
|
||||
/// Starting from version 2 the header is always in little endian.
|
||||
std::endian endian = std::endian::little;
|
||||
|
||||
readBinaryLittleEndian(version, in);
|
||||
|
||||
if (version == 0x0100ULL)
|
||||
{
|
||||
/// Version 1 could write the header of an encrypted file in either little-endian or big-endian.
|
||||
/// So now if we read the version as little-endian and it's 256 that means two things: the version is actually 1 and the whole header is in big endian.
|
||||
endian = std::endian::big;
|
||||
version = 1;
|
||||
}
|
||||
|
||||
if (version < 1 || version > kCurrentVersion)
|
||||
throw Exception(ErrorCodes::DATA_ENCRYPTION_ERROR, "Version {} of the header is not supported", version);
|
||||
|
||||
UInt16 algorithm_u16;
|
||||
readPODBinary(algorithm_u16, in);
|
||||
if (std::endian::native != endian)
|
||||
algorithm_u16 = std::byteswap(algorithm_u16);
|
||||
if (algorithm_u16 >= static_cast<UInt16>(Algorithm::MAX))
|
||||
throw Exception(ErrorCodes::DATA_ENCRYPTION_ERROR, "Algorithm {} is not supported", algorithm_u16);
|
||||
algorithm = static_cast<Algorithm>(algorithm_u16);
|
||||
|
||||
readPODBinary(key_id, in);
|
||||
readPODBinary(key_hash, in);
|
||||
size_t bytes_to_skip = kSize - kHeaderSignature.length() - sizeof(version) - sizeof(algorithm_u16) - InitVector::kSize;
|
||||
|
||||
if (version < 2)
|
||||
{
|
||||
UInt64 key_id;
|
||||
UInt8 small_key_hash;
|
||||
readPODBinary(key_id, in);
|
||||
readPODBinary(small_key_hash, in);
|
||||
bytes_to_skip -= sizeof(key_id) + sizeof(small_key_hash);
|
||||
if (std::endian::native != endian)
|
||||
key_id = std::byteswap(key_id);
|
||||
key_fingerprint = calculateV1KeyFingerprint(small_key_hash, key_id);
|
||||
}
|
||||
else
|
||||
{
|
||||
readBinaryLittleEndian(key_fingerprint, in);
|
||||
bytes_to_skip -= sizeof(key_fingerprint);
|
||||
}
|
||||
|
||||
init_vector.read(in);
|
||||
|
||||
constexpr size_t reserved_size = kSize - header_signature_size - sizeof(version) - sizeof(algorithm_u16) - sizeof(key_id) - sizeof(key_hash) - InitVector::kSize;
|
||||
static_assert(reserved_size < kSize);
|
||||
in.ignore(reserved_size);
|
||||
chassert(bytes_to_skip < kSize);
|
||||
in.ignore(bytes_to_skip);
|
||||
}
|
||||
|
||||
void Header::write(WriteBuffer & out) const
|
||||
{
|
||||
constexpr size_t header_signature_size = std::size(kHeaderSignature) - 1;
|
||||
out.write(kHeaderSignature, header_signature_size);
|
||||
writeString(kHeaderSignature, out);
|
||||
|
||||
UInt16 version = kHeaderCurrentVersion;
|
||||
writePODBinary(version, out);
|
||||
writeBinaryLittleEndian(version, out);
|
||||
|
||||
UInt16 algorithm_u16 = static_cast<UInt16>(algorithm);
|
||||
writePODBinary(algorithm_u16, out);
|
||||
writeBinaryLittleEndian(algorithm_u16, out);
|
||||
|
||||
writeBinaryLittleEndian(key_fingerprint, out);
|
||||
|
||||
writePODBinary(key_id, out);
|
||||
writePODBinary(key_hash, out);
|
||||
init_vector.write(out);
|
||||
|
||||
constexpr size_t reserved_size = kSize - header_signature_size - sizeof(version) - sizeof(algorithm_u16) - sizeof(key_id) - sizeof(key_hash) - InitVector::kSize;
|
||||
constexpr size_t reserved_size = kSize - kHeaderSignature.length() - sizeof(version) - sizeof(algorithm_u16) - sizeof(key_fingerprint) - InitVector::kSize;
|
||||
static_assert(reserved_size < kSize);
|
||||
char reserved_zero_bytes[reserved_size] = {};
|
||||
out.write(reserved_zero_bytes, reserved_size);
|
||||
char zero_bytes[reserved_size] = {};
|
||||
out.write(zero_bytes, reserved_size);
|
||||
}
|
||||
|
||||
UInt8 calculateKeyHash(const String & key)
|
||||
UInt128 calculateKeyFingerprint(const String & key)
|
||||
{
|
||||
return static_cast<UInt8>(sipHash64(key.data(), key.size())) & 0x0F;
|
||||
const UInt64 seed0 = 0x4368456E63727970ULL; // ChEncryp
|
||||
const UInt64 seed1 = 0x7465644469736B46ULL; // tedDiskF
|
||||
return sipHash128Keyed(seed0, seed1, key.data(), key.size());
|
||||
}
|
||||
|
||||
UInt128 calculateV1KeyFingerprint(const String & key, UInt64 key_id)
|
||||
{
|
||||
/// In the version 1 we stored {key_id, very_small_hash(key)} instead of a fingerprint.
|
||||
UInt8 small_key_hash = sipHash64(key.data(), key.size()) & 0x0F;
|
||||
return calculateV1KeyFingerprint(small_key_hash, key_id);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -23,13 +23,14 @@ enum class Algorithm
|
||||
AES_128_CTR, /// Size of key is 16 bytes.
|
||||
AES_192_CTR, /// Size of key is 24 bytes.
|
||||
AES_256_CTR, /// Size of key is 32 bytes.
|
||||
MAX
|
||||
};
|
||||
|
||||
String toString(Algorithm algorithm);
|
||||
void parseFromString(Algorithm & algorithm, const String & str);
|
||||
Algorithm parseAlgorithmFromString(const String & str);
|
||||
|
||||
/// Throws an exception if a specified key size doesn't correspond a specified encryption algorithm.
|
||||
void checkKeySize(Algorithm algorithm, size_t key_size);
|
||||
void checkKeySize(size_t key_size, Algorithm algorithm);
|
||||
|
||||
|
||||
/// Initialization vector. Its size is always 16 bytes.
|
||||
@ -103,15 +104,34 @@ private:
|
||||
|
||||
|
||||
/// File header which is stored at the beginning of encrypted files.
|
||||
///
|
||||
/// The format of that header is following:
|
||||
/// +--------+------+--------------------------------------------------------------------------+
|
||||
/// | offset | size | description |
|
||||
/// +--------+------+--------------------------------------------------------------------------+
|
||||
/// | 0 | 3 | 'E', 'N', 'C' (file's signature) |
|
||||
/// | 3 | 2 | version of this header (1..2) |
|
||||
/// | 5 | 2 | encryption algorithm (0..2, 0=AES_128_CTR, 1=AES_192_CTR, 2=AES_256_CTR) |
|
||||
/// | 7 | 16 | fingerprint of encryption key (SipHash) |
|
||||
/// | 23 | 16 | initialization vector (randomly generated) |
|
||||
/// | 39 | 25 | reserved for future use |
|
||||
/// +--------+------+--------------------------------------------------------------------------+
|
||||
///
|
||||
struct Header
|
||||
{
|
||||
/// Versions:
|
||||
/// 1 - Initial version
|
||||
/// 2 - The header of an encrypted file contains the fingerprint of a used encryption key instead of a pair {key_id, very_small_hash(key)}.
|
||||
/// The header is always stored in little endian.
|
||||
static constexpr const UInt16 kCurrentVersion = 2;
|
||||
|
||||
UInt16 version = kCurrentVersion;
|
||||
|
||||
/// Encryption algorithm.
|
||||
Algorithm algorithm = Algorithm::AES_128_CTR;
|
||||
|
||||
/// Identifier of the key to encrypt or decrypt this file.
|
||||
UInt64 key_id = 0;
|
||||
|
||||
/// Hash of the key to encrypt or decrypt this file.
|
||||
UInt8 key_hash = 0;
|
||||
/// Fingerprint of a key.
|
||||
UInt128 key_fingerprint = 0;
|
||||
|
||||
InitVector init_vector;
|
||||
|
||||
@ -122,9 +142,11 @@ struct Header
|
||||
void write(WriteBuffer & out) const;
|
||||
};
|
||||
|
||||
/// Calculates the hash of a passed key.
|
||||
/// 1 byte is enough because this hash is used only for the first check.
|
||||
UInt8 calculateKeyHash(const String & key);
|
||||
/// Calculates the fingerprint of a passed encryption key.
|
||||
UInt128 calculateKeyFingerprint(const String & key);
|
||||
|
||||
/// Calculates kind of the fingerprint of a passed encryption key & key ID as it was implemented in version 1.
|
||||
UInt128 calculateV1KeyFingerprint(const String & key, UInt64 key_id);
|
||||
|
||||
}
|
||||
}
|
||||
|
@ -30,15 +30,6 @@ void WriteBufferFromFileDecorator::finalizeImpl()
|
||||
|
||||
WriteBufferFromFileDecorator::~WriteBufferFromFileDecorator()
|
||||
{
|
||||
try
|
||||
{
|
||||
finalize();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||
}
|
||||
|
||||
/// It is not a mistake that swap is called here
|
||||
/// Swap has been called at constructor, it should be called at destructor
|
||||
/// In oreder to provide valid buffer for impl's d-tor call
|
||||
|
@ -106,7 +106,14 @@ WriteBufferFromPocoSocket::WriteBufferFromPocoSocket(Poco::Net::Socket & socket_
|
||||
|
||||
WriteBufferFromPocoSocket::~WriteBufferFromPocoSocket()
|
||||
{
|
||||
finalize();
|
||||
try
|
||||
{
|
||||
finalize();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -109,8 +109,8 @@ void WriteBufferFromS3::nextImpl()
|
||||
|
||||
if (is_prefinalized)
|
||||
throw Exception(
|
||||
ErrorCodes::LOGICAL_ERROR,
|
||||
"Cannot write to prefinalized buffer for S3, the file could have been created with PutObjectRequest");
|
||||
ErrorCodes::LOGICAL_ERROR,
|
||||
"Cannot write to prefinalized buffer for S3, the file could have been created with PutObjectRequest");
|
||||
|
||||
/// Make sense to call waitIfAny before adding new async task to check if there is an exception
|
||||
/// The faster the exception is propagated the lesser time is spent for cancellation
|
||||
@ -242,7 +242,13 @@ WriteBufferFromS3::~WriteBufferFromS3()
|
||||
// That destructor could be call with finalized=false in case of exceptions
|
||||
if (!finalized)
|
||||
{
|
||||
LOG_ERROR(log, "WriteBufferFromS3 is not finalized in destructor. It could be if an exception occurs. File is not written to S3. {}.", getLogDetails());
|
||||
LOG_INFO(log,
|
||||
"WriteBufferFromS3 is not finalized in destructor. "
|
||||
"It could be if an exception occurs. File is not written to S3. "
|
||||
"{}. "
|
||||
"Stack trace: {}",
|
||||
getLogDetails(),
|
||||
StackTrace().toString());
|
||||
}
|
||||
|
||||
task_tracker->safeWaitAll();
|
||||
|
@ -162,15 +162,8 @@ void WriteBufferFromS3::TaskTracker::waitTilInflightShrink()
|
||||
|
||||
for (auto & it : finished_futures)
|
||||
{
|
||||
SCOPE_EXIT({
|
||||
/// According to basic exception safety TaskTracker has to be destroyed after exception
|
||||
/// If it would be true than this SCOPE_EXIT is superfluous
|
||||
/// However WriteBufferWithFinalizeCallback, WriteBufferFromFileDecorator do call finalize in d-tor
|
||||
/// TaskTracker has to cope this until the issue with finalizing in d-tor is addressed in #50274
|
||||
futures.erase(it);
|
||||
});
|
||||
|
||||
it->get();
|
||||
futures.erase(it);
|
||||
}
|
||||
|
||||
finished_futures.clear();
|
||||
|
@ -49,6 +49,8 @@ private:
|
||||
/// waitTilInflightShrink waits til the number of in-flight tasks beyond the limit `max_tasks_inflight`.
|
||||
void waitTilInflightShrink() TSA_NO_THREAD_SAFETY_ANALYSIS;
|
||||
|
||||
void collectFinishedFutures(bool propagate_exceptions) TSA_REQUIRES(mutex);
|
||||
|
||||
const bool is_async;
|
||||
ThreadPoolCallbackRunner<void> scheduler;
|
||||
const size_t max_tasks_inflight;
|
||||
|
@ -226,8 +226,7 @@ TEST(FileEncryptionPositionUpdateTest, Decryption)
|
||||
String key = "1234567812345678";
|
||||
FileEncryption::Header header;
|
||||
header.algorithm = Algorithm::AES_128_CTR;
|
||||
header.key_id = 1;
|
||||
header.key_hash = calculateKeyHash(key);
|
||||
header.key_fingerprint = calculateKeyFingerprint(key);
|
||||
header.init_vector = InitVector::random();
|
||||
|
||||
auto lwb = std::make_unique<WriteBufferFromFile>(tmp_path);
|
||||
|
@ -609,9 +609,16 @@ protected:
|
||||
test_with_pool = GetParam();
|
||||
client = MockS3::Client::CreateClient(bucket);
|
||||
if (test_with_pool)
|
||||
{
|
||||
/// Do not block the main thread awaiting the others task.
|
||||
/// This test use the only one thread at all
|
||||
getSettings().s3_max_inflight_parts_for_one_file = 0;
|
||||
async_policy = std::make_unique<MockS3::SimpleAsyncTasks>();
|
||||
}
|
||||
else
|
||||
{
|
||||
async_policy = std::make_unique<MockS3::BaseSyncPolicy>();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -592,7 +592,6 @@ bool FileCache::tryReserve(FileSegment & file_segment, const size_t size)
|
||||
std::unordered_map<Key, EvictionCandidates> to_delete;
|
||||
size_t freeable_space = 0, freeable_count = 0;
|
||||
|
||||
size_t removed_size = 0;
|
||||
auto iterate_func = [&](LockedKey & locked_key, FileSegmentMetadataPtr segment_metadata)
|
||||
{
|
||||
chassert(segment_metadata->file_segment->assertCorrectness());
|
||||
@ -659,8 +658,8 @@ bool FileCache::tryReserve(FileSegment & file_segment, const size_t size)
|
||||
&& freeable_count == 0 && main_priority->getElementsCount(cache_lock) == main_priority->getElementsLimit());
|
||||
|
||||
LOG_TEST(
|
||||
log, "Overflow: {}, size: {}, ready to remove: {}, current cache size: {}/{}, elements: {}/{}, while reserving for {}:{}",
|
||||
is_overflow, size, removed_size,
|
||||
log, "Overflow: {}, size: {}, ready to remove: {} ({} in number), current cache size: {}/{}, elements: {}/{}, while reserving for {}:{}",
|
||||
is_overflow, size, freeable_space, freeable_count,
|
||||
main_priority->getSize(cache_lock), main_priority->getSizeLimit(),
|
||||
main_priority->getElementsCount(cache_lock), main_priority->getElementsLimit(),
|
||||
file_segment.key(), file_segment.offset());
|
||||
|
@ -93,10 +93,13 @@ void InterserverIOHTTPHandler::handleRequest(HTTPServerRequest & request, HTTPSe
|
||||
|
||||
auto write_response = [&](const std::string & message)
|
||||
{
|
||||
if (response.sent())
|
||||
return;
|
||||
|
||||
auto & out = *used_output.out;
|
||||
if (response.sent())
|
||||
{
|
||||
out.finalize();
|
||||
return;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
writeString(message, out);
|
||||
@ -127,7 +130,10 @@ void InterserverIOHTTPHandler::handleRequest(HTTPServerRequest & request, HTTPSe
|
||||
catch (Exception & e)
|
||||
{
|
||||
if (e.code() == ErrorCodes::TOO_MANY_SIMULTANEOUS_QUERIES)
|
||||
{
|
||||
used_output.out->finalize();
|
||||
return;
|
||||
}
|
||||
|
||||
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR);
|
||||
|
||||
|
@ -490,6 +490,7 @@ private:
|
||||
{
|
||||
/// Stop ParallelFormattingOutputFormat correctly.
|
||||
writer.reset();
|
||||
write_buf->finalize();
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
@ -166,6 +166,7 @@ UInt32 GinIndexStore::getNextSegmentIDRange(const String & file_name, size_t n)
|
||||
/// Write segment ID 1
|
||||
writeVarUInt(1, *ostr);
|
||||
ostr->sync();
|
||||
ostr->finalize();
|
||||
}
|
||||
|
||||
/// Read id in file
|
||||
@ -188,6 +189,7 @@ UInt32 GinIndexStore::getNextSegmentIDRange(const String & file_name, size_t n)
|
||||
|
||||
writeVarUInt(result + n, *ostr);
|
||||
ostr->sync();
|
||||
ostr->finalize();
|
||||
}
|
||||
return result;
|
||||
}
|
||||
@ -317,8 +319,13 @@ void GinIndexStore::writeSegment()
|
||||
current_segment.segment_id = getNextSegmentID();
|
||||
|
||||
metadata_file_stream->sync();
|
||||
metadata_file_stream->finalize();
|
||||
|
||||
dict_file_stream->sync();
|
||||
dict_file_stream->finalize();
|
||||
|
||||
postings_file_stream->sync();
|
||||
postings_file_stream->finalize();
|
||||
}
|
||||
|
||||
GinIndexStoreDeserializer::GinIndexStoreDeserializer(const GinIndexStorePtr & store_)
|
||||
|
@ -119,22 +119,12 @@ void MergedBlockOutputStream::Finalizer::Impl::finish()
|
||||
part->getDataPartStorage().removeFile(file_name);
|
||||
}
|
||||
|
||||
MergedBlockOutputStream::Finalizer::~Finalizer()
|
||||
{
|
||||
try
|
||||
{
|
||||
finish();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
tryLogCurrentException("MergedBlockOutputStream");
|
||||
}
|
||||
}
|
||||
|
||||
MergedBlockOutputStream::Finalizer::Finalizer(Finalizer &&) noexcept = default;
|
||||
MergedBlockOutputStream::Finalizer & MergedBlockOutputStream::Finalizer::operator=(Finalizer &&) noexcept = default;
|
||||
MergedBlockOutputStream::Finalizer::Finalizer(std::unique_ptr<Impl> impl_) : impl(std::move(impl_)) {}
|
||||
|
||||
MergedBlockOutputStream::Finalizer::~Finalizer() = default;
|
||||
|
||||
void MergedBlockOutputStream::finalizePart(
|
||||
const MergeTreeMutableDataPartPtr & new_part,
|
||||
bool sync,
|
||||
|
@ -44,9 +44,10 @@ public:
|
||||
std::unique_ptr<Impl> impl;
|
||||
|
||||
explicit Finalizer(std::unique_ptr<Impl> impl_);
|
||||
~Finalizer();
|
||||
Finalizer(Finalizer &&) noexcept;
|
||||
Finalizer & operator=(Finalizer &&) noexcept;
|
||||
~Finalizer();
|
||||
|
||||
|
||||
void finish();
|
||||
};
|
||||
|
@ -955,6 +955,7 @@ private:
|
||||
{
|
||||
/// Stop ParallelFormattingOutputFormat correctly.
|
||||
writer.reset();
|
||||
write_buf->finalize();
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
@ -341,7 +341,10 @@ private:
|
||||
void finalize()
|
||||
{
|
||||
compressed.next();
|
||||
compressed.finalize();
|
||||
|
||||
plain->next();
|
||||
plain->finalize();
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -831,6 +831,7 @@ private:
|
||||
{
|
||||
/// Stop ParallelFormattingOutputFormat correctly.
|
||||
writer.reset();
|
||||
write_buf->finalize();
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
@ -480,6 +480,7 @@ void StorageURLSink::finalize()
|
||||
{
|
||||
/// Stop ParallelFormattingOutputFormat correctly.
|
||||
writer.reset();
|
||||
write_buf->finalize();
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
@ -21,8 +21,6 @@
|
||||
01072_optimize_skip_unused_shards_const_expr_eval
|
||||
01083_expressions_in_engine_arguments
|
||||
01086_odbc_roundtrip
|
||||
01142_join_lc_and_nullable_in_key
|
||||
01142_merge_join_lc_and_nullable_in_key
|
||||
01152_cross_replication
|
||||
01155_rename_move_materialized_view
|
||||
01173_transaction_control_queries
|
||||
@ -39,8 +37,6 @@
|
||||
01319_optimize_skip_unused_shards_nesting
|
||||
01353_low_cardinality_join_types
|
||||
01455_shard_leaf_max_rows_bytes_to_read
|
||||
01476_right_full_join_switch
|
||||
01477_lc_in_merge_join_left_key
|
||||
01487_distributed_in_not_default_db
|
||||
01495_subqueries_in_with_statement
|
||||
01504_rocksdb
|
||||
|
@ -70,9 +70,12 @@ This pull-request will be merged automatically as it reaches the mergeable state
|
||||
|
||||
### If the PR was closed and then reopened
|
||||
|
||||
If it stuck, check {pr_url} for `{label_backports_created}` and delete it if \
|
||||
If it stuck, check {pr_url} for `{backport_created_label}` and delete it if \
|
||||
necessary. Manually merging will do nothing, since `{label_backports_created}` \
|
||||
prevents the original PR {pr_url} from being processed.
|
||||
|
||||
If you want to recreate the PR: delete the `{label_cherrypick}` label and delete this branch.
|
||||
You may also need to delete the `{label_backports_created}` label from the original PR.
|
||||
"""
|
||||
BACKPORT_DESCRIPTION = """This pull-request is a last step of an automated \
|
||||
backporting.
|
||||
@ -82,7 +85,13 @@ close it.
|
||||
"""
|
||||
REMOTE = ""
|
||||
|
||||
def __init__(self, name: str, pr: PullRequest, repo: Repository):
|
||||
def __init__(
|
||||
self,
|
||||
name: str,
|
||||
pr: PullRequest,
|
||||
repo: Repository,
|
||||
backport_created_label: str = Labels.BACKPORTS_CREATED,
|
||||
):
|
||||
self.name = name
|
||||
self.pr = pr
|
||||
self.repo = repo
|
||||
@ -93,6 +102,8 @@ close it.
|
||||
self.backport_pr = None # type: Optional[PullRequest]
|
||||
self._backported = False
|
||||
|
||||
self.backport_created_label = backport_created_label
|
||||
|
||||
self.git_prefix = ( # All commits to cherrypick are done as robot-clickhouse
|
||||
"git -c user.email=robot-clickhouse@users.noreply.github.com "
|
||||
"-c user.name=robot-clickhouse -c commit.gpgsign=false"
|
||||
@ -226,7 +237,8 @@ close it.
|
||||
body=self.CHERRYPICK_DESCRIPTION.format(
|
||||
pr_number=self.pr.number,
|
||||
pr_url=self.pr.html_url,
|
||||
label_backports_created=Labels.BACKPORTS_CREATED,
|
||||
backport_created_label=self.backport_created_label,
|
||||
label_cherrypick=Labels.CHERRYPICK,
|
||||
),
|
||||
base=self.backport_branch,
|
||||
head=self.cherrypick_branch,
|
||||
@ -459,11 +471,12 @@ class Backport:
|
||||
pr_labels = [label.name for label in pr.labels]
|
||||
if self.must_create_backport_label in pr_labels:
|
||||
branches = [
|
||||
ReleaseBranch(br, pr, self.repo) for br in self.release_branches
|
||||
ReleaseBranch(br, pr, self.repo, self.backport_created_label)
|
||||
for br in self.release_branches
|
||||
] # type: List[ReleaseBranch]
|
||||
else:
|
||||
branches = [
|
||||
ReleaseBranch(br, pr, self.repo)
|
||||
ReleaseBranch(br, pr, self.repo, self.backport_created_label)
|
||||
for br in [
|
||||
label.split("-", 1)[0][1:] # v21.8-must-backport
|
||||
for label in pr_labels
|
||||
@ -492,6 +505,7 @@ class Backport:
|
||||
)
|
||||
bp_cp_prs = self.gh.get_pulls_from_search(
|
||||
query=f"type:pr repo:{self._repo_name} {query_suffix}",
|
||||
label=f"{Labels.BACKPORT},{Labels.CHERRYPICK}",
|
||||
)
|
||||
for br in branches:
|
||||
br.pop_prs(bp_cp_prs)
|
||||
|
@ -1963,9 +1963,9 @@ class ClickHouseCluster:
|
||||
return output
|
||||
|
||||
def copy_file_to_container(self, container_id, local_path, dest_path):
|
||||
with open(local_path, "r") as fdata:
|
||||
with open(local_path, "rb") as fdata:
|
||||
data = fdata.read()
|
||||
encodedBytes = base64.b64encode(data.encode("utf-8"))
|
||||
encodedBytes = base64.b64encode(data)
|
||||
encodedStr = str(encodedBytes, "utf-8")
|
||||
self.exec_in_container(
|
||||
container_id,
|
||||
@ -1974,7 +1974,6 @@ class ClickHouseCluster:
|
||||
"-c",
|
||||
"echo {} | base64 --decode > {}".format(encodedStr, dest_path),
|
||||
],
|
||||
user="root",
|
||||
)
|
||||
|
||||
def wait_for_url(
|
||||
|
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@ -1,9 +1,11 @@
|
||||
import pytest
|
||||
import os.path
|
||||
from helpers.cluster import ClickHouseCluster
|
||||
from helpers.client import QueryRuntimeException
|
||||
import os.path
|
||||
from helpers.test_tools import assert_eq_with_retry
|
||||
|
||||
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
|
||||
|
||||
FIRST_PART_NAME = "all_1_1_0"
|
||||
|
||||
@ -170,53 +172,62 @@ def test_optimize_table(policy, encrypted_disk):
|
||||
assert node.query(select_query) == "(0,'data'),(1,'data'),(2,'data'),(3,'data')"
|
||||
|
||||
|
||||
# Test adding encryption key on the fly.
|
||||
def test_add_key():
|
||||
def make_storage_policy_with_keys(policy_name, keys):
|
||||
node.exec_in_container(
|
||||
[
|
||||
"bash",
|
||||
"-c",
|
||||
"""cat > /etc/clickhouse-server/config.d/storage_policy_{policy_name}.xml << EOF
|
||||
def make_storage_policy_with_keys(
|
||||
policy_name, keys, check_system_storage_policies=False
|
||||
):
|
||||
if check_system_storage_policies:
|
||||
node.query("SELECT policy_name FROM system.storage_policies")
|
||||
|
||||
node.exec_in_container(
|
||||
[
|
||||
"bash",
|
||||
"-c",
|
||||
"""cat > /etc/clickhouse-server/config.d/storage_policy_{policy_name}.xml << EOF
|
||||
<clickhouse>
|
||||
<storage_configuration>
|
||||
<disks>
|
||||
<{policy_name}_disk>
|
||||
<type>encrypted</type>
|
||||
<disk>disk_local</disk>
|
||||
<path>{policy_name}_dir/</path>
|
||||
{keys}
|
||||
</{policy_name}_disk>
|
||||
</disks>
|
||||
<policies>
|
||||
<{policy_name}>
|
||||
<volumes>
|
||||
<main>
|
||||
<disk>{policy_name}_disk</disk>
|
||||
</main>
|
||||
</volumes>
|
||||
</{policy_name}>
|
||||
</policies>
|
||||
</storage_configuration>
|
||||
<storage_configuration>
|
||||
<disks>
|
||||
<{policy_name}_disk>
|
||||
<type>encrypted</type>
|
||||
<disk>disk_local</disk>
|
||||
<path>{policy_name}_dir/</path>
|
||||
{keys}
|
||||
</{policy_name}_disk>
|
||||
</disks>
|
||||
<policies>
|
||||
<{policy_name}>
|
||||
<volumes>
|
||||
<main>
|
||||
<disk>{policy_name}_disk</disk>
|
||||
</main>
|
||||
</volumes>
|
||||
</{policy_name}>
|
||||
</policies>
|
||||
</storage_configuration>
|
||||
</clickhouse>
|
||||
EOF""".format(
|
||||
policy_name=policy_name, keys=keys
|
||||
),
|
||||
]
|
||||
policy_name=policy_name, keys=keys
|
||||
),
|
||||
]
|
||||
)
|
||||
|
||||
node.query("SYSTEM RELOAD CONFIG")
|
||||
|
||||
if check_system_storage_policies:
|
||||
assert_eq_with_retry(
|
||||
node,
|
||||
f"SELECT policy_name FROM system.storage_policies WHERE policy_name='{policy_name}'",
|
||||
policy_name,
|
||||
)
|
||||
node.query("SYSTEM RELOAD CONFIG")
|
||||
|
||||
|
||||
# Test adding encryption key on the fly.
|
||||
def test_add_keys():
|
||||
keys = "<key>firstfirstfirstf</key>"
|
||||
make_storage_policy_with_keys(
|
||||
"encrypted_policy_multikeys", keys, check_system_storage_policies=True
|
||||
)
|
||||
|
||||
# Add some data to an encrypted disk.
|
||||
node.query("SELECT policy_name FROM system.storage_policies")
|
||||
make_storage_policy_with_keys(
|
||||
"encrypted_policy_multikeys", "<key>firstfirstfirstf</key>"
|
||||
)
|
||||
assert_eq_with_retry(
|
||||
node,
|
||||
"SELECT policy_name FROM system.storage_policies WHERE policy_name='encrypted_policy_multikeys'",
|
||||
"encrypted_policy_multikeys",
|
||||
)
|
||||
|
||||
node.query(
|
||||
"""
|
||||
CREATE TABLE encrypted_test (
|
||||
@ -233,31 +244,39 @@ EOF""".format(
|
||||
assert node.query(select_query) == "(0,'data'),(1,'data')"
|
||||
|
||||
# Add a second key and start using it.
|
||||
make_storage_policy_with_keys(
|
||||
"encrypted_policy_multikeys",
|
||||
keys = """
|
||||
<key>firstfirstfirstf</key>
|
||||
<key>secondsecondseco</key>
|
||||
<current_key>secondsecondseco</current_key>
|
||||
"""
|
||||
<key id="0">firstfirstfirstf</key>
|
||||
<key id="1">secondsecondseco</key>
|
||||
<current_key_id>1</current_key_id>
|
||||
""",
|
||||
)
|
||||
make_storage_policy_with_keys("encrypted_policy_multikeys", keys)
|
||||
|
||||
node.query("INSERT INTO encrypted_test VALUES (2,'data'),(3,'data')")
|
||||
|
||||
# Now "(0,'data'),(1,'data')" is encrypted with the first key and "(2,'data'),(3,'data')" is encrypted with the second key.
|
||||
# All data are accessible.
|
||||
assert node.query(select_query) == "(0,'data'),(1,'data'),(2,'data'),(3,'data')"
|
||||
|
||||
# Try to replace the first key with something wrong, and check that "(0,'data'),(1,'data')" cannot be read.
|
||||
make_storage_policy_with_keys(
|
||||
"encrypted_policy_multikeys",
|
||||
"""
|
||||
<key id="0">wrongwrongwrongw</key>
|
||||
# Keys can be reordered.
|
||||
keys = """
|
||||
<key id="1">secondsecondseco</key>
|
||||
<key id="0">firstfirstfirstf</key>
|
||||
<current_key_id>1</current_key_id>
|
||||
""",
|
||||
)
|
||||
"""
|
||||
make_storage_policy_with_keys("encrypted_policy_multikeys", keys)
|
||||
|
||||
expected_error = "Wrong key"
|
||||
# All data are still accessible.
|
||||
assert node.query(select_query) == "(0,'data'),(1,'data'),(2,'data'),(3,'data')"
|
||||
|
||||
# Try to replace the first key with something wrong, and check that "(0,'data'),(1,'data')" cannot be read.
|
||||
keys = """
|
||||
<key>secondsecondseco</key>
|
||||
<key>wrongwrongwrongw</key>
|
||||
<current_key>secondsecondseco</current_key>
|
||||
"""
|
||||
make_storage_policy_with_keys("encrypted_policy_multikeys", keys)
|
||||
|
||||
expected_error = "Not found an encryption key required to decipher"
|
||||
assert expected_error in node.query_and_get_error(select_query)
|
||||
|
||||
# Detach the part encrypted with the wrong key and check that another part containing "(2,'data'),(3,'data')" still can be read.
|
||||
@ -265,6 +284,159 @@ EOF""".format(
|
||||
assert node.query(select_query) == "(2,'data'),(3,'data')"
|
||||
|
||||
|
||||
# Test adding encryption key on the fly.
|
||||
def test_add_keys_with_id():
|
||||
keys = "<key>firstfirstfirstf</key>"
|
||||
make_storage_policy_with_keys(
|
||||
"encrypted_policy_multikeys", keys, check_system_storage_policies=True
|
||||
)
|
||||
|
||||
# Add some data to an encrypted disk.
|
||||
node.query(
|
||||
"""
|
||||
CREATE TABLE encrypted_test (
|
||||
id Int64,
|
||||
data String
|
||||
) ENGINE=MergeTree()
|
||||
ORDER BY id
|
||||
SETTINGS storage_policy='encrypted_policy_multikeys'
|
||||
"""
|
||||
)
|
||||
|
||||
node.query("INSERT INTO encrypted_test VALUES (0,'data'),(1,'data')")
|
||||
select_query = "SELECT * FROM encrypted_test ORDER BY id FORMAT Values"
|
||||
assert node.query(select_query) == "(0,'data'),(1,'data')"
|
||||
|
||||
# Add a second key and start using it.
|
||||
keys = """
|
||||
<key id="0">firstfirstfirstf</key>
|
||||
<key id="1">secondsecondseco</key>
|
||||
<current_key_id>1</current_key_id>
|
||||
"""
|
||||
make_storage_policy_with_keys("encrypted_policy_multikeys", keys)
|
||||
|
||||
node.query("INSERT INTO encrypted_test VALUES (2,'data'),(3,'data')")
|
||||
|
||||
# Now "(0,'data'),(1,'data')" is encrypted with the first key and "(2,'data'),(3,'data')" is encrypted with the second key.
|
||||
# All data are accessible.
|
||||
assert node.query(select_query) == "(0,'data'),(1,'data'),(2,'data'),(3,'data')"
|
||||
|
||||
# Keys can be reordered.
|
||||
keys = """
|
||||
<key id="1">secondsecondseco</key>
|
||||
<key id="0">firstfirstfirstf</key>
|
||||
<current_key_id>1</current_key_id>
|
||||
"""
|
||||
make_storage_policy_with_keys("encrypted_policy_multikeys", keys)
|
||||
|
||||
# All data are still accessible.
|
||||
assert node.query(select_query) == "(0,'data'),(1,'data'),(2,'data'),(3,'data')"
|
||||
|
||||
# Try to replace the first key with something wrong, and check that "(0,'data'),(1,'data')" cannot be read.
|
||||
keys = """
|
||||
<key id="1">secondsecondseco</key>
|
||||
<key id="0">wrongwrongwrongw</key>
|
||||
<current_key_id>1</current_key_id>
|
||||
"""
|
||||
make_storage_policy_with_keys("encrypted_policy_multikeys", keys)
|
||||
|
||||
expected_error = "Not found an encryption key required to decipher"
|
||||
assert expected_error in node.query_and_get_error(select_query)
|
||||
|
||||
# Detach the part encrypted with the wrong key and check that another part containing "(2,'data'),(3,'data')" still can be read.
|
||||
node.query("ALTER TABLE encrypted_test DETACH PART '{}'".format(FIRST_PART_NAME))
|
||||
assert node.query(select_query) == "(2,'data'),(3,'data')"
|
||||
|
||||
|
||||
# Test appending of encrypted files.
|
||||
def test_log_family():
|
||||
keys = "<key>firstfirstfirstf</key>"
|
||||
make_storage_policy_with_keys(
|
||||
"encrypted_policy_multikeys", keys, check_system_storage_policies=True
|
||||
)
|
||||
|
||||
# Add some data to an encrypted disk.
|
||||
node.query(
|
||||
"""
|
||||
CREATE TABLE encrypted_test (
|
||||
id Int64,
|
||||
data String
|
||||
) ENGINE=Log
|
||||
SETTINGS storage_policy='encrypted_policy_multikeys'
|
||||
"""
|
||||
)
|
||||
|
||||
node.query("INSERT INTO encrypted_test VALUES (0,'data'),(1,'data')")
|
||||
select_query = "SELECT * FROM encrypted_test ORDER BY id FORMAT Values"
|
||||
assert node.query(select_query) == "(0,'data'),(1,'data')"
|
||||
|
||||
# Add a second key and start using it.
|
||||
keys = """
|
||||
<key>firstfirstfirstf</key>
|
||||
<key>secondsecondseco</key>
|
||||
<current_key>secondsecondseco</current_key>
|
||||
"""
|
||||
make_storage_policy_with_keys("encrypted_policy_multikeys", keys)
|
||||
|
||||
node.query("INSERT INTO encrypted_test VALUES (2,'data'),(3,'data')")
|
||||
assert node.query(select_query) == "(0,'data'),(1,'data'),(2,'data'),(3,'data')"
|
||||
|
||||
# Everything is still encrypted with the first key (because the Log engine appends files), so the second key can be removed.
|
||||
keys = "<key>firstfirstfirstf</key>"
|
||||
make_storage_policy_with_keys("encrypted_policy_multikeys", keys)
|
||||
|
||||
assert node.query(select_query) == "(0,'data'),(1,'data'),(2,'data'),(3,'data')"
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"old_version",
|
||||
["version_1le", "version_1be", "version_2"],
|
||||
)
|
||||
def test_migration_from_old_version(old_version):
|
||||
keys = """
|
||||
<key id="1">first_key_first_</key>
|
||||
<key id="2">second_key_secon</key>
|
||||
<key id="3">third_key_third_</key>
|
||||
<current_key_id>3</current_key_id>
|
||||
"""
|
||||
make_storage_policy_with_keys(
|
||||
"migration_from_old_version", keys, check_system_storage_policies=True
|
||||
)
|
||||
|
||||
# Create a table without data.
|
||||
node.query(
|
||||
"""
|
||||
CREATE TABLE encrypted_test (
|
||||
id Int64,
|
||||
data String
|
||||
) ENGINE=Log
|
||||
SETTINGS storage_policy='migration_from_old_version'
|
||||
"""
|
||||
)
|
||||
|
||||
# Copy table's data from an old version.
|
||||
data_path = node.query(
|
||||
"SELECT data_paths[1] FROM system.tables WHERE table = 'encrypted_test'"
|
||||
).splitlines()[0]
|
||||
node.query("DETACH TABLE encrypted_test")
|
||||
|
||||
old_version_dir = os.path.join(SCRIPT_DIR, "old_versions", old_version)
|
||||
for file_name in os.listdir(old_version_dir):
|
||||
src_path = os.path.join(old_version_dir, file_name)
|
||||
dest_path = os.path.join(data_path, file_name)
|
||||
node.copy_file_to_container(src_path, dest_path)
|
||||
|
||||
node.query("ATTACH TABLE encrypted_test")
|
||||
|
||||
# We can read from encrypted disk after migration.
|
||||
select_query = "SELECT * FROM encrypted_test ORDER BY id FORMAT Values"
|
||||
assert node.query(select_query) == "(0,'ab'),(1,'cdefg')"
|
||||
|
||||
# We can append files on encrypted disk after migration.
|
||||
node.query("INSERT INTO encrypted_test VALUES (2,'xyz')")
|
||||
assert node.query(select_query) == "(0,'ab'),(1,'cdefg'),(2,'xyz')"
|
||||
|
||||
|
||||
def test_read_in_order():
|
||||
node.query(
|
||||
"CREATE TABLE encrypted_test(`a` UInt64, `b` String(150)) ENGINE = MergeTree() ORDER BY (a, b) SETTINGS storage_policy='encrypted_policy'"
|
||||
|
@ -33,6 +33,18 @@
|
||||
<request_timeout_ms>20000</request_timeout_ms>
|
||||
<s3_max_inflight_parts_for_one_file>1</s3_max_inflight_parts_for_one_file>
|
||||
</broken_s3>
|
||||
<broken_s3_always_multi_part>
|
||||
<type>s3</type>
|
||||
<endpoint>http://resolver:8083/root/data/</endpoint>
|
||||
<access_key_id>minio</access_key_id>
|
||||
<secret_access_key>minio123</secret_access_key>
|
||||
<skip_access_check>true</skip_access_check>
|
||||
<retry_attempts>0</retry_attempts>
|
||||
<connect_timeout_ms>20000</connect_timeout_ms>
|
||||
<request_timeout_ms>20000</request_timeout_ms>
|
||||
<s3_max_single_part_upload_size>0</s3_max_single_part_upload_size>
|
||||
<s3_max_inflight_parts_for_one_file>1</s3_max_inflight_parts_for_one_file>
|
||||
</broken_s3_always_multi_part>
|
||||
<hdd>
|
||||
<type>local</type>
|
||||
<path>/</path>
|
||||
@ -128,6 +140,13 @@
|
||||
</main>
|
||||
</volumes>
|
||||
</broken_s3>
|
||||
<broken_s3_always_multi_part>
|
||||
<volumes>
|
||||
<main>
|
||||
<disk>broken_s3_always_multi_part</disk>
|
||||
</main>
|
||||
</volumes>
|
||||
</broken_s3_always_multi_part>
|
||||
</policies>
|
||||
</storage_configuration>
|
||||
|
||||
|
@ -930,8 +930,9 @@ def test_merge_canceled_by_drop(cluster, node_name):
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("storage_policy", ["broken_s3_always_multi_part", "broken_s3"])
|
||||
@pytest.mark.parametrize("node_name", ["node"])
|
||||
def test_merge_canceled_by_s3_errors(cluster, node_name):
|
||||
def test_merge_canceled_by_s3_errors(cluster, node_name, storage_policy):
|
||||
node = cluster.instances[node_name]
|
||||
node.query("DROP TABLE IF EXISTS test_merge_canceled_by_s3_errors NO DELAY")
|
||||
node.query(
|
||||
@ -939,7 +940,7 @@ def test_merge_canceled_by_s3_errors(cluster, node_name):
|
||||
" (key UInt32, value String)"
|
||||
" Engine=MergeTree() "
|
||||
" ORDER BY value "
|
||||
" SETTINGS storage_policy='broken_s3'"
|
||||
f" SETTINGS storage_policy='{storage_policy}'"
|
||||
)
|
||||
node.query("SYSTEM STOP MERGES test_merge_canceled_by_s3_errors")
|
||||
node.query(
|
||||
@ -1048,8 +1049,8 @@ def test_s3_engine_heavy_write_check_mem(cluster, node_name, in_flight_memory):
|
||||
" AND type!='QueryStart'"
|
||||
).split()
|
||||
|
||||
assert int(memory_usage) < 1.1 * memory
|
||||
assert int(memory_usage) > 0.9 * memory
|
||||
assert int(memory_usage) < 1.2 * memory
|
||||
assert int(memory_usage) > 0.8 * memory
|
||||
|
||||
assert int(wait_inflight) > 10 * 1000 * 1000
|
||||
|
||||
@ -1096,7 +1097,7 @@ def test_s3_disk_heavy_write_check_mem(cluster, node_name):
|
||||
" AND type!='QueryStart'"
|
||||
)
|
||||
|
||||
assert int(result) < 1.1 * memory
|
||||
assert int(result) > 0.9 * memory
|
||||
assert int(result) < 1.2 * memory
|
||||
assert int(result) > 0.8 * memory
|
||||
|
||||
check_no_objects_after_drop(cluster, node_name=node_name)
|
||||
|
430
tests/queries/0_stateless/02722_matcher_join_use_nulls.reference
Normal file
430
tests/queries/0_stateless/02722_matcher_join_use_nulls.reference
Normal file
@ -0,0 +1,430 @@
|
||||
-- { echoOn }
|
||||
|
||||
SELECT '============ LEFT JOIN ============' FORMAT Null;
|
||||
SELECT a, toTypeName(a)
|
||||
FROM (
|
||||
SELECT *
|
||||
FROM (SELECT 1 :: Int32 as a) t1
|
||||
LEFT JOIN (SELECT 2 :: UInt32 as a) t2
|
||||
ON t1.a = t2.a
|
||||
) ORDER BY 1;
|
||||
1 Int32
|
||||
SELECT *, * APPLY toTypeName
|
||||
FROM (
|
||||
SELECT a
|
||||
FROM (SELECT 1 :: Int32 as a) t1
|
||||
LEFT JOIN (SELECT 2 :: UInt32 as a) t2
|
||||
ON t1.a = t2.a
|
||||
) ORDER BY 1;
|
||||
1 Int32
|
||||
SELECT *, * APPLY toTypeName
|
||||
FROM (
|
||||
SELECT *
|
||||
FROM (SELECT 1 :: Int32 as a) t1
|
||||
LEFT JOIN (SELECT 2 :: UInt32 as a) t2
|
||||
ON t1.a = t2.a
|
||||
) ORDER BY 1;
|
||||
1 \N Int32 Nullable(UInt32)
|
||||
SELECT *, * APPLY toTypeName
|
||||
FROM (
|
||||
SELECT t1.*
|
||||
FROM (SELECT 1 :: Int32 as a) t1
|
||||
LEFT JOIN (SELECT 2 :: UInt32 as a) t2
|
||||
ON t1.a = t2.a
|
||||
) ORDER BY 1;
|
||||
1 Int32
|
||||
SELECT *, * APPLY toTypeName
|
||||
FROM (
|
||||
SELECT t2.*
|
||||
FROM (SELECT 1 :: Int32 as a) t1
|
||||
LEFT JOIN (SELECT 2 :: UInt32 as a) t2
|
||||
ON t1.a = t2.a
|
||||
) ORDER BY 1;
|
||||
\N Nullable(UInt32)
|
||||
SELECT *, * APPLY toTypeName
|
||||
FROM (
|
||||
SELECT t2.*
|
||||
FROM t1
|
||||
LEFT JOIN t2
|
||||
ON t1.a = t2.a
|
||||
) ORDER BY 1;
|
||||
\N \N Nullable(UInt32) Nullable(UInt32)
|
||||
SELECT *, * APPLY toTypeName
|
||||
FROM (
|
||||
SELECT a
|
||||
FROM (SELECT 1 :: Int32 as a) t1
|
||||
LEFT JOIN (SELECT 2 :: UInt32 as a) t2
|
||||
USING (a)
|
||||
) ORDER BY 1;
|
||||
1 Int64
|
||||
SELECT *, * APPLY toTypeName
|
||||
FROM (
|
||||
SELECT *
|
||||
FROM (SELECT 1 :: Int32 as a) t1
|
||||
LEFT JOIN (SELECT 2 :: UInt32 as a) t2
|
||||
USING (a)
|
||||
) ORDER BY 1;
|
||||
1 Int64
|
||||
SELECT *, * APPLY toTypeName
|
||||
FROM (
|
||||
SELECT t1.*
|
||||
FROM (SELECT 1 :: Int32 as a) t1
|
||||
LEFT JOIN (SELECT 2 :: UInt32 as a) t2
|
||||
USING (a)
|
||||
) ORDER BY 1;
|
||||
1 Int32
|
||||
SELECT *, * APPLY toTypeName
|
||||
FROM (
|
||||
SELECT t2.*
|
||||
FROM (SELECT 1 :: Int32 as a) t1
|
||||
LEFT JOIN (SELECT 2 :: UInt32 as a) t2
|
||||
USING (a)
|
||||
) ORDER BY 1;
|
||||
\N Nullable(UInt32)
|
||||
SELECT *, * APPLY toTypeName
|
||||
FROM (
|
||||
SELECT *
|
||||
FROM t1
|
||||
LEFT JOIN t2
|
||||
USING (a)
|
||||
) ORDER BY 1;
|
||||
1 \N Int64 Nullable(UInt32)
|
||||
SELECT a, toTypeName(a)
|
||||
FROM (
|
||||
SELECT *
|
||||
FROM (SELECT 1 :: Int32 as a) t1
|
||||
LEFT JOIN (SELECT 2 :: UInt32 as key) t2
|
||||
ON t1.a = t2.key
|
||||
) ORDER BY 1;
|
||||
1 Int32
|
||||
SELECT *, * APPLY toTypeName
|
||||
FROM (
|
||||
SELECT a
|
||||
FROM (SELECT 1 :: Int32 as a) t1
|
||||
LEFT JOIN (SELECT 2 :: UInt32 as key) t2
|
||||
ON t1.a = t2.key
|
||||
) ORDER BY 1;
|
||||
1 Int32
|
||||
SELECT *, * APPLY toTypeName
|
||||
FROM (
|
||||
SELECT *
|
||||
FROM (SELECT 1 :: Int32 as a) t1
|
||||
LEFT JOIN (SELECT 2 :: UInt32 as key) t2
|
||||
ON t1.a = t2.key
|
||||
) ORDER BY 1;
|
||||
1 \N Int32 Nullable(UInt32)
|
||||
SELECT *, * APPLY toTypeName
|
||||
FROM (
|
||||
SELECT t1.*
|
||||
FROM (SELECT 1 :: Int32 as a) t1
|
||||
LEFT JOIN (SELECT 2 :: UInt32 as key) t2
|
||||
ON t1.a = t2.key
|
||||
) ORDER BY 1;
|
||||
1 Int32
|
||||
SELECT *, * APPLY toTypeName
|
||||
FROM (
|
||||
SELECT t2.*
|
||||
FROM (SELECT 1 :: Int32 as a) t1
|
||||
LEFT JOIN (SELECT 2 :: UInt32 as key) t2
|
||||
ON t1.a = t2.key
|
||||
) ORDER BY 1;
|
||||
\N Nullable(UInt32)
|
||||
SELECT *, * APPLY toTypeName
|
||||
FROM (
|
||||
SELECT t2.*
|
||||
FROM t1
|
||||
LEFT JOIN t2
|
||||
ON t1.a = t2.key
|
||||
) ORDER BY 1;
|
||||
\N \N Nullable(UInt32) Nullable(UInt32)
|
||||
SELECT '============ RIGHT JOIN ============' FORMAT Null;
|
||||
SELECT a, toTypeName(a)
|
||||
FROM (
|
||||
SELECT *
|
||||
FROM (SELECT 1 :: Int32 as a) t1
|
||||
RIGHT JOIN (SELECT 2 :: UInt32 as a) t2
|
||||
ON t1.a = t2.a
|
||||
) ORDER BY 1;
|
||||
\N Nullable(Int32)
|
||||
SELECT *, * APPLY toTypeName
|
||||
FROM (
|
||||
SELECT a
|
||||
FROM (SELECT 1 :: Int32 as a) t1
|
||||
RIGHT JOIN (SELECT 2 :: UInt32 as a) t2
|
||||
ON t1.a = t2.a
|
||||
) ORDER BY 1;
|
||||
\N Nullable(Int32)
|
||||
SELECT *, * APPLY toTypeName
|
||||
FROM (
|
||||
SELECT *
|
||||
FROM (SELECT 1 :: Int32 as a) t1
|
||||
RIGHT JOIN (SELECT 2 :: UInt32 as a) t2
|
||||
ON t1.a = t2.a
|
||||
) ORDER BY 1;
|
||||
\N 2 Nullable(Int32) UInt32
|
||||
SELECT *, * APPLY toTypeName
|
||||
FROM (
|
||||
SELECT t1.*
|
||||
FROM (SELECT 1 :: Int32 as a) t1
|
||||
RIGHT JOIN (SELECT 2 :: UInt32 as a) t2
|
||||
ON t1.a = t2.a
|
||||
) ORDER BY 1;
|
||||
\N Nullable(Int32)
|
||||
SELECT *, * APPLY toTypeName
|
||||
FROM (
|
||||
SELECT t2.*
|
||||
FROM (SELECT 1 :: Int32 as a) t1
|
||||
RIGHT JOIN (SELECT 2 :: UInt32 as a) t2
|
||||
ON t1.a = t2.a
|
||||
) ORDER BY 1;
|
||||
2 UInt32
|
||||
SELECT *, * APPLY toTypeName
|
||||
FROM (
|
||||
SELECT t2.*
|
||||
FROM t1
|
||||
RIGHT JOIN t2
|
||||
ON t1.a = t2.a
|
||||
) ORDER BY 1;
|
||||
2 2 UInt32 UInt32
|
||||
SELECT *, * APPLY toTypeName
|
||||
FROM (
|
||||
SELECT a
|
||||
FROM (SELECT 1 :: Int32 as a) t1
|
||||
RIGHT JOIN (SELECT 2 :: UInt32 as a) t2
|
||||
USING (a)
|
||||
) ORDER BY 1;
|
||||
2 Int64
|
||||
SELECT *, * APPLY toTypeName
|
||||
FROM (
|
||||
SELECT *
|
||||
FROM (SELECT 1 :: Int32 as a) t1
|
||||
RIGHT JOIN (SELECT 2 :: UInt32 as a) t2
|
||||
USING (a)
|
||||
) ORDER BY 1;
|
||||
2 Int64
|
||||
SELECT *, * APPLY toTypeName
|
||||
FROM (
|
||||
SELECT t1.*
|
||||
FROM (SELECT 1 :: Int32 as a) t1
|
||||
RIGHT JOIN (SELECT 2 :: UInt32 as a) t2
|
||||
USING (a)
|
||||
) ORDER BY 1;
|
||||
2 Nullable(Int32)
|
||||
SELECT *, * APPLY toTypeName
|
||||
FROM (
|
||||
SELECT t2.*
|
||||
FROM (SELECT 1 :: Int32 as a) t1
|
||||
RIGHT JOIN (SELECT 2 :: UInt32 as a) t2
|
||||
USING (a)
|
||||
) ORDER BY 1;
|
||||
2 UInt32
|
||||
SELECT *, * APPLY toTypeName
|
||||
FROM (
|
||||
SELECT *
|
||||
FROM t1
|
||||
RIGHT JOIN t2
|
||||
USING (a)
|
||||
) ORDER BY 1;
|
||||
2 2 Int64 UInt32
|
||||
SELECT a, toTypeName(a)
|
||||
FROM (
|
||||
SELECT *
|
||||
FROM (SELECT 1 :: Int32 as a) t1
|
||||
RIGHT JOIN (SELECT 2 :: UInt32 as key) t2
|
||||
ON t1.a = t2.key
|
||||
) ORDER BY 1;
|
||||
\N Nullable(Int32)
|
||||
SELECT *, * APPLY toTypeName
|
||||
FROM (
|
||||
SELECT a
|
||||
FROM (SELECT 1 :: Int32 as a) t1
|
||||
RIGHT JOIN (SELECT 2 :: UInt32 as key) t2
|
||||
ON t1.a = t2.key
|
||||
) ORDER BY 1;
|
||||
\N Nullable(Int32)
|
||||
SELECT *, * APPLY toTypeName
|
||||
FROM (
|
||||
SELECT *
|
||||
FROM (SELECT 1 :: Int32 as a) t1
|
||||
RIGHT JOIN (SELECT 2 :: UInt32 as key) t2
|
||||
ON t1.a = t2.key
|
||||
) ORDER BY 1;
|
||||
\N 2 Nullable(Int32) UInt32
|
||||
SELECT *, * APPLY toTypeName
|
||||
FROM (
|
||||
SELECT t1.*
|
||||
FROM (SELECT 1 :: Int32 as a) t1
|
||||
RIGHT JOIN (SELECT 2 :: UInt32 as key) t2
|
||||
ON t1.a = t2.key
|
||||
) ORDER BY 1;
|
||||
\N Nullable(Int32)
|
||||
SELECT *, * APPLY toTypeName
|
||||
FROM (
|
||||
SELECT t2.*
|
||||
FROM (SELECT 1 :: Int32 as a) t1
|
||||
RIGHT JOIN (SELECT 2 :: UInt32 as key) t2
|
||||
ON t1.a = t2.key
|
||||
) ORDER BY 1;
|
||||
2 UInt32
|
||||
SELECT *, * APPLY toTypeName
|
||||
FROM (
|
||||
SELECT t2.*
|
||||
FROM t1
|
||||
RIGHT JOIN t2
|
||||
ON t1.a = t2.key
|
||||
) ORDER BY 1;
|
||||
2 2 UInt32 UInt32
|
||||
SELECT '============ FULL JOIN ============' FORMAT Null;
|
||||
SELECT a, toTypeName(a)
|
||||
FROM (
|
||||
SELECT *
|
||||
FROM (SELECT 1 :: Int32 as a) t1
|
||||
FULL JOIN (SELECT 2 :: UInt32 as a) t2
|
||||
ON t1.a = t2.a
|
||||
) ORDER BY 1;
|
||||
1 Nullable(Int32)
|
||||
\N Nullable(Int32)
|
||||
SELECT *, * APPLY toTypeName
|
||||
FROM (
|
||||
SELECT a
|
||||
FROM (SELECT 1 :: Int32 as a) t1
|
||||
FULL JOIN (SELECT 2 :: UInt32 as a) t2
|
||||
ON t1.a = t2.a
|
||||
) ORDER BY 1;
|
||||
1 Nullable(Int32)
|
||||
\N Nullable(Int32)
|
||||
SELECT *, * APPLY toTypeName
|
||||
FROM (
|
||||
SELECT *
|
||||
FROM (SELECT 1 :: Int32 as a) t1
|
||||
FULL JOIN (SELECT 2 :: UInt32 as a) t2
|
||||
ON t1.a = t2.a
|
||||
) ORDER BY 1;
|
||||
1 \N Nullable(Int32) Nullable(UInt32)
|
||||
\N 2 Nullable(Int32) Nullable(UInt32)
|
||||
SELECT *, * APPLY toTypeName
|
||||
FROM (
|
||||
SELECT t1.*
|
||||
FROM (SELECT 1 :: Int32 as a) t1
|
||||
FULL JOIN (SELECT 2 :: UInt32 as a) t2
|
||||
ON t1.a = t2.a
|
||||
) ORDER BY 1;
|
||||
1 Nullable(Int32)
|
||||
\N Nullable(Int32)
|
||||
SELECT *, * APPLY toTypeName
|
||||
FROM (
|
||||
SELECT t2.*
|
||||
FROM (SELECT 1 :: Int32 as a) t1
|
||||
FULL JOIN (SELECT 2 :: UInt32 as a) t2
|
||||
ON t1.a = t2.a
|
||||
) ORDER BY 1;
|
||||
2 Nullable(UInt32)
|
||||
\N Nullable(UInt32)
|
||||
SELECT *, * APPLY toTypeName
|
||||
FROM (
|
||||
SELECT t2.*
|
||||
FROM t1
|
||||
FULL JOIN t2
|
||||
ON t1.a = t2.a
|
||||
) ORDER BY 1;
|
||||
2 2 Nullable(UInt32) Nullable(UInt32)
|
||||
\N \N Nullable(UInt32) Nullable(UInt32)
|
||||
SELECT *, * APPLY toTypeName
|
||||
FROM (
|
||||
SELECT a
|
||||
FROM (SELECT 1 :: Int32 as a) t1
|
||||
FULL JOIN (SELECT 2 :: UInt32 as a) t2
|
||||
USING (a)
|
||||
) ORDER BY 1;
|
||||
1 Nullable(Int64)
|
||||
2 Nullable(Int64)
|
||||
SELECT *, * APPLY toTypeName
|
||||
FROM (
|
||||
SELECT *
|
||||
FROM (SELECT 1 :: Int32 as a) t1
|
||||
FULL JOIN (SELECT 2 :: UInt32 as a) t2
|
||||
USING (a)
|
||||
) ORDER BY 1;
|
||||
1 Nullable(Int64)
|
||||
2 Nullable(Int64)
|
||||
SELECT *, * APPLY toTypeName
|
||||
FROM (
|
||||
SELECT t1.*
|
||||
FROM (SELECT 1 :: Int32 as a) t1
|
||||
FULL JOIN (SELECT 2 :: UInt32 as a) t2
|
||||
USING (a)
|
||||
) ORDER BY 1;
|
||||
1 Nullable(Int32)
|
||||
2 Nullable(Int32)
|
||||
SELECT *, * APPLY toTypeName
|
||||
FROM (
|
||||
SELECT t2.*
|
||||
FROM (SELECT 1 :: Int32 as a) t1
|
||||
FULL JOIN (SELECT 2 :: UInt32 as a) t2
|
||||
USING (a)
|
||||
) ORDER BY 1;
|
||||
2 Nullable(UInt32)
|
||||
\N Nullable(UInt32)
|
||||
SELECT *, * APPLY toTypeName
|
||||
FROM (
|
||||
SELECT *
|
||||
FROM t1
|
||||
FULL JOIN t2
|
||||
USING (a)
|
||||
) ORDER BY 1;
|
||||
1 \N Nullable(Int64) Nullable(UInt32)
|
||||
2 2 Nullable(Int64) Nullable(UInt32)
|
||||
SELECT a, toTypeName(a)
|
||||
FROM (
|
||||
SELECT *
|
||||
FROM (SELECT 1 :: Int32 as a) t1
|
||||
FULL JOIN (SELECT 2 :: UInt32 as key) t2
|
||||
ON t1.a = t2.key
|
||||
) ORDER BY 1;
|
||||
1 Nullable(Int32)
|
||||
\N Nullable(Int32)
|
||||
SELECT *, * APPLY toTypeName
|
||||
FROM (
|
||||
SELECT a
|
||||
FROM (SELECT 1 :: Int32 as a) t1
|
||||
FULL JOIN (SELECT 2 :: UInt32 as key) t2
|
||||
ON t1.a = t2.key
|
||||
) ORDER BY 1;
|
||||
1 Nullable(Int32)
|
||||
\N Nullable(Int32)
|
||||
SELECT *, * APPLY toTypeName
|
||||
FROM (
|
||||
SELECT *
|
||||
FROM (SELECT 1 :: Int32 as a) t1
|
||||
FULL JOIN (SELECT 2 :: UInt32 as key) t2
|
||||
ON t1.a = t2.key
|
||||
) ORDER BY 1;
|
||||
1 \N Nullable(Int32) Nullable(UInt32)
|
||||
\N 2 Nullable(Int32) Nullable(UInt32)
|
||||
SELECT *, * APPLY toTypeName
|
||||
FROM (
|
||||
SELECT t1.*
|
||||
FROM (SELECT 1 :: Int32 as a) t1
|
||||
FULL JOIN (SELECT 2 :: UInt32 as key) t2
|
||||
ON t1.a = t2.key
|
||||
) ORDER BY 1;
|
||||
1 Nullable(Int32)
|
||||
\N Nullable(Int32)
|
||||
SELECT *, * APPLY toTypeName
|
||||
FROM (
|
||||
SELECT t2.*
|
||||
FROM (SELECT 1 :: Int32 as a) t1
|
||||
FULL JOIN (SELECT 2 :: UInt32 as key) t2
|
||||
ON t1.a = t2.key
|
||||
) ORDER BY 1;
|
||||
2 Nullable(UInt32)
|
||||
\N Nullable(UInt32)
|
||||
SELECT *, * APPLY toTypeName
|
||||
FROM (
|
||||
SELECT t2.*
|
||||
FROM t1
|
||||
FULL JOIN t2
|
||||
ON t1.a = t2.key
|
||||
) ORDER BY 1;
|
||||
2 2 Nullable(UInt32) Nullable(UInt32)
|
||||
\N \N Nullable(UInt32) Nullable(UInt32)
|
119
tests/queries/0_stateless/02722_matcher_join_use_nulls.sql.j2
Normal file
119
tests/queries/0_stateless/02722_matcher_join_use_nulls.sql.j2
Normal file
@ -0,0 +1,119 @@
|
||||
DROP TABLE IF EXISTS t1;
|
||||
DROP TABLE IF EXISTS t2;
|
||||
|
||||
CREATE TABLE t1 (a Int32) ENGINE = TinyLog;
|
||||
CREATE TABLE t2 (a UInt32, key UInt32) ENGINE = TinyLog;
|
||||
|
||||
INSERT INTO t1 VALUES (1);
|
||||
INSERT INTO t2 VALUES (2, 2);
|
||||
|
||||
SET join_use_nulls = 1;
|
||||
SET allow_experimental_analyzer = 1;
|
||||
|
||||
-- { echoOn }
|
||||
|
||||
{% for KIND in ('LEFT', 'RIGHT', 'FULL') -%}
|
||||
|
||||
SELECT '============ {{ KIND }} JOIN ============' FORMAT Null;
|
||||
|
||||
{% for right_column_name in ['a', 'key'] -%}
|
||||
|
||||
SELECT a, toTypeName(a)
|
||||
FROM (
|
||||
SELECT *
|
||||
FROM (SELECT 1 :: Int32 as a) t1
|
||||
{{ KIND }} JOIN (SELECT 2 :: UInt32 as {{ right_column_name }}) t2
|
||||
ON t1.a = t2.{{ right_column_name }}
|
||||
) ORDER BY 1;
|
||||
|
||||
SELECT *, * APPLY toTypeName
|
||||
FROM (
|
||||
SELECT a
|
||||
FROM (SELECT 1 :: Int32 as a) t1
|
||||
{{ KIND }} JOIN (SELECT 2 :: UInt32 as {{ right_column_name }}) t2
|
||||
ON t1.a = t2.{{ right_column_name }}
|
||||
) ORDER BY 1;
|
||||
|
||||
SELECT *, * APPLY toTypeName
|
||||
FROM (
|
||||
SELECT *
|
||||
FROM (SELECT 1 :: Int32 as a) t1
|
||||
{{ KIND }} JOIN (SELECT 2 :: UInt32 as {{ right_column_name }}) t2
|
||||
ON t1.a = t2.{{ right_column_name }}
|
||||
) ORDER BY 1;
|
||||
|
||||
SELECT *, * APPLY toTypeName
|
||||
FROM (
|
||||
SELECT t1.*
|
||||
FROM (SELECT 1 :: Int32 as a) t1
|
||||
{{ KIND }} JOIN (SELECT 2 :: UInt32 as {{ right_column_name }}) t2
|
||||
ON t1.a = t2.{{ right_column_name }}
|
||||
) ORDER BY 1;
|
||||
|
||||
SELECT *, * APPLY toTypeName
|
||||
FROM (
|
||||
SELECT t2.*
|
||||
FROM (SELECT 1 :: Int32 as a) t1
|
||||
{{ KIND }} JOIN (SELECT 2 :: UInt32 as {{ right_column_name }}) t2
|
||||
ON t1.a = t2.{{ right_column_name }}
|
||||
) ORDER BY 1;
|
||||
|
||||
SELECT *, * APPLY toTypeName
|
||||
FROM (
|
||||
SELECT t2.*
|
||||
FROM t1
|
||||
{{ KIND }} JOIN t2
|
||||
ON t1.a = t2.{{ right_column_name }}
|
||||
) ORDER BY 1;
|
||||
|
||||
{% if right_column_name == 'a' -%}
|
||||
|
||||
SELECT *, * APPLY toTypeName
|
||||
FROM (
|
||||
SELECT a
|
||||
FROM (SELECT 1 :: Int32 as a) t1
|
||||
{{ KIND }} JOIN (SELECT 2 :: UInt32 as {{ right_column_name }}) t2
|
||||
USING (a)
|
||||
) ORDER BY 1;
|
||||
|
||||
SELECT *, * APPLY toTypeName
|
||||
FROM (
|
||||
SELECT *
|
||||
FROM (SELECT 1 :: Int32 as a) t1
|
||||
{{ KIND }} JOIN (SELECT 2 :: UInt32 as {{ right_column_name }}) t2
|
||||
USING (a)
|
||||
) ORDER BY 1;
|
||||
|
||||
SELECT *, * APPLY toTypeName
|
||||
FROM (
|
||||
SELECT t1.*
|
||||
FROM (SELECT 1 :: Int32 as a) t1
|
||||
{{ KIND }} JOIN (SELECT 2 :: UInt32 as {{ right_column_name }}) t2
|
||||
USING (a)
|
||||
) ORDER BY 1;
|
||||
|
||||
SELECT *, * APPLY toTypeName
|
||||
FROM (
|
||||
SELECT t2.*
|
||||
FROM (SELECT 1 :: Int32 as a) t1
|
||||
{{ KIND }} JOIN (SELECT 2 :: UInt32 as {{ right_column_name }}) t2
|
||||
USING (a)
|
||||
) ORDER BY 1;
|
||||
|
||||
SELECT *, * APPLY toTypeName
|
||||
FROM (
|
||||
SELECT *
|
||||
FROM t1
|
||||
{{ KIND }} JOIN t2
|
||||
USING (a)
|
||||
) ORDER BY 1;
|
||||
|
||||
{% endif -%}
|
||||
|
||||
{% endfor -%}
|
||||
{% endfor -%}
|
||||
|
||||
-- { echoOff }
|
||||
|
||||
DROP TABLE IF EXISTS t1;
|
||||
DROP TABLE IF EXISTS t2;
|
@ -8,3 +8,4 @@ nan 1 1 1 1
|
||||
nan nan 1 1 1 1
|
||||
--
|
||||
nan
|
||||
--
|
@ -7,11 +7,13 @@ SELECT nan AS lhs, cast(nan, 'Float32') AS rhs, lhs = rhs, lhs = materialize(rhs
|
||||
|
||||
SELECT '--';
|
||||
|
||||
DROP TABLE IF EXISTS test_table;
|
||||
CREATE TABLE test_table
|
||||
(
|
||||
id UInt32,
|
||||
value UInt32
|
||||
) ENGINE = MergeTree ORDER BY id;
|
||||
|
||||
INSERT INTO test_table VALUES (76, 57);
|
||||
|
||||
SELECT value FROM (SELECT stddevSamp(id) AS value FROM test_table) as subquery
|
||||
@ -33,6 +35,7 @@ CREATE TABLE test_table
|
||||
value_1 UInt32,
|
||||
value_2 Float32
|
||||
) ENGINE = MergeTree ORDER BY id;
|
||||
|
||||
INSERT INTO test_table VALUES (12000, 36, 77.94);
|
||||
|
||||
SELECT value
|
||||
@ -40,3 +43,18 @@ FROM (SELECT (corr(value_1, value_1) OVER test_window) AS value FROM test_table
|
||||
WHERE not (not (value <> value));
|
||||
|
||||
DROP TABLE test_table;
|
||||
|
||||
SELECT '--';
|
||||
|
||||
CREATE TABLE test_table
|
||||
(
|
||||
id Float32,
|
||||
value Float32
|
||||
) ENGINE=MergeTree ORDER BY id;
|
||||
|
||||
INSERT INTO test_table VALUES (-10.75, 95.57);
|
||||
|
||||
SELECT * FROM (SELECT corr(id, id) as corr_value FROM test_table GROUP BY value) AS subquery LEFT ANTI JOIN test_table ON (subquery.corr_value = test_table.id)
|
||||
WHERE (test_table.id >= test_table.id) AND (NOT (test_table.id >= test_table.id));
|
||||
|
||||
DROP TABLE test_table;
|
Loading…
Reference in New Issue
Block a user