Compare commits

...

25 Commits

Author SHA1 Message Date
Daniil Ivanik
8083d5cc3a
Merge 56ab216e63 into 7eee149487 2024-11-21 13:12:51 +00:00
divanik
56ab216e63 To checkout 2024-11-21 12:46:19 +00:00
Vladimir Cherkasov
7eee149487
Merge pull request #72145 from ClickHouse/vdimir/fix02374_analyzer_join_using
FIx 02374_analyzer_join_using
2024-11-21 10:35:46 +00:00
Antonio Andelic
cc3c7e74ae
Merge pull request #70523 from ClickHouse/randomize-keeper-feature-flasgs-keeper
Randomize Keeper feature flags in integration tests
2024-11-21 08:20:07 +00:00
vdimir
f45bd58849
FIx 02374_analyzer_join_using 2024-11-20 11:51:42 +00:00
Antonio Andelic
57db5cf24c Randomize correctly 2024-11-19 13:39:54 +01:00
Antonio Andelic
459fa898ed Merge branch 'master' into randomize-keeper-feature-flasgs-keeper 2024-11-19 10:00:41 +01:00
divanik
f76df37cfa Add getAllFilesMask 2024-11-18 17:17:26 +00:00
divanik
63621bd381 Divided logic by files 2024-11-18 16:27:11 +00:00
divanik
5746970ea9 To checkout 2024-11-17 13:41:23 +00:00
Antonio Andelic
0d875ecf5c
Always randomize in private 2024-11-04 12:56:14 +01:00
Antonio Andelic
6698212b5a Fix test 2024-10-31 13:39:41 +01:00
Antonio Andelic
c787838cb2 Merge branch 'master' into randomize-keeper-feature-flasgs-keeper 2024-10-31 12:01:31 +01:00
Antonio Andelic
eb020f1c4b Fix RemoveRecursive 2024-10-29 09:05:31 +01:00
Antonio Andelic
1a40df4d0c Merge branch 'master' into randomize-keeper-feature-flasgs-keeper 2024-10-28 12:07:38 +01:00
Antonio Andelic
4380c6035d Merge branch 'master' into randomize-keeper-feature-flasgs-keeper 2024-10-15 16:51:36 +02:00
Antonio Andelic
5145281088 Correct randomization 2024-10-15 16:51:32 +02:00
Antonio Andelic
35fa4c43e4 More fixes 2024-10-10 19:39:28 +02:00
robot-clickhouse
293e076493 Automatic style fix 2024-10-10 14:03:18 +00:00
Antonio Andelic
8b92603c6d Fix old version 2024-10-10 15:52:56 +02:00
Antonio Andelic
fb14f6e029 Fix MultiRead 2024-10-10 15:52:37 +02:00
robot-clickhouse
e1f37ec2bb Automatic style fix 2024-10-10 07:54:28 +00:00
Antonio Andelic
cc0ef6104f Fix MultiRead 2024-10-10 09:45:42 +02:00
robot-clickhouse
46ce65e66e Automatic style fix 2024-10-09 16:21:09 +00:00
Antonio Andelic
e048893b85 Randomize feature flags in integration test 2024-10-09 18:11:50 +02:00
26 changed files with 825 additions and 258 deletions

View File

@ -135,6 +135,7 @@ add_headers_and_sources(dbms Storages/ObjectStorage/S3)
add_headers_and_sources(dbms Storages/ObjectStorage/HDFS)
add_headers_and_sources(dbms Storages/ObjectStorage/Local)
add_headers_and_sources(dbms Storages/ObjectStorage/DataLakes)
add_headers_and_sources(dbms Storages/ObjectStorage/DataLakes/Iceberg)
add_headers_and_sources(dbms Common/NamedCollections)
add_headers_and_sources(dbms Common/Scheduler/Workload)

View File

@ -291,3 +291,15 @@ namespace cctz_extension
ZoneInfoSourceFactory zone_info_source_factory = custom_factory;
}
DateLUTImpl::Values DateLUTImpl::lutIndexByMonthSinceEpochStartsZeroIndexing(Int32 months) const
{
Int16 year = 1970 + months / 12;
UInt8 month = months % 12 + 1;
return lut[makeLUTIndex(year, month, 1)];
}
DateLUTImpl::Values DateLUTImpl::lutIndexByYearSinceEpochStartsZeroIndexing(Int16 years) const
{
return lut[makeLUTIndex(years + 1970, 1, 1)];
}

View File

@ -1166,6 +1166,10 @@ public:
return LUTIndex{std::min(index, static_cast<UInt32>(DATE_LUT_SIZE - 1))};
}
Values lutIndexByMonthSinceEpochStartsZeroIndexing(Int32 months) const;
Values lutIndexByYearSinceEpochStartsZeroIndexing(Int16 years) const;
/// Create DayNum from year, month, day of month.
ExtendedDayNum makeDayNum(Int16 year, UInt8 month, UInt8 day_of_month, Int32 default_error_day_num = 0) const
{

View File

@ -341,7 +341,10 @@ Coordination::Error ZooKeeper::tryGetChildren(
const EventPtr & watch,
Coordination::ListRequestType list_request_type)
{
return tryGetChildrenWatch(path, res, stat,
return tryGetChildrenWatch(
path,
res,
stat,
watch ? std::make_shared<Coordination::WatchCallback>(callbackForEvent(watch)) : Coordination::WatchCallbackPtr{},
list_request_type);
}
@ -975,11 +978,14 @@ void ZooKeeper::removeRecursive(const std::string & path, uint32_t remove_nodes_
Coordination::Error ZooKeeper::tryRemoveRecursive(const std::string & path, uint32_t remove_nodes_limit)
{
if (!isFeatureEnabled(DB::KeeperFeatureFlag::REMOVE_RECURSIVE))
const auto fallback_method = [&]
{
tryRemoveChildrenRecursive(path);
return tryRemove(path);
}
};
if (!isFeatureEnabled(DB::KeeperFeatureFlag::REMOVE_RECURSIVE))
return fallback_method();
auto promise = std::make_shared<std::promise<Coordination::RemoveRecursiveResponse>>();
auto future = promise->get_future();
@ -998,6 +1004,10 @@ Coordination::Error ZooKeeper::tryRemoveRecursive(const std::string & path, uint
}
auto response = future.get();
if (response.error == Coordination::Error::ZNOTEMPTY) /// limit was too low, try without RemoveRecursive request
return fallback_method();
return response.error;
}

View File

@ -486,13 +486,13 @@ public:
/// Remove the node with the subtree.
/// If Keeper supports RemoveRecursive operation then it will be performed atomically.
/// Otherwise if someone concurrently adds or removes a node in the subtree, the result is undefined.
void removeRecursive(const std::string & path, uint32_t remove_nodes_limit = 100);
void removeRecursive(const std::string & path, uint32_t remove_nodes_limit = 1000);
/// Same as removeRecursive but in case if Keeper does not supports RemoveRecursive and
/// if someone concurrently removes a node in the subtree, this will not cause errors.
/// For instance, you can call this method twice concurrently for the same node and the end
/// result would be the same as for the single call.
Coordination::Error tryRemoveRecursive(const std::string & path, uint32_t remove_nodes_limit = 100);
Coordination::Error tryRemoveRecursive(const std::string & path, uint32_t remove_nodes_limit = 1000);
/// Similar to removeRecursive(...) and tryRemoveRecursive(...), but does not remove path itself.
/// Node defined as RemoveException will not be deleted.

View File

@ -767,6 +767,11 @@ size_t ZooKeeperMultiRequest::sizeImpl() const
}
void ZooKeeperMultiRequest::readImpl(ReadBuffer & in)
{
readImpl(in, /*request_validator=*/{});
}
void ZooKeeperMultiRequest::readImpl(ReadBuffer & in, RequestValidator request_validator)
{
while (true)
{
@ -788,6 +793,8 @@ void ZooKeeperMultiRequest::readImpl(ReadBuffer & in)
ZooKeeperRequestPtr request = ZooKeeperRequestFactory::instance().get(op_num);
request->readImpl(in);
if (request_validator)
request_validator(*request);
requests.push_back(request);
if (in.eof())

View File

@ -570,6 +570,9 @@ struct ZooKeeperMultiRequest final : MultiRequest<ZooKeeperRequestPtr>, ZooKeepe
void writeImpl(WriteBuffer & out) const override;
size_t sizeImpl() const override;
void readImpl(ReadBuffer & in) override;
using RequestValidator = std::function<void(const ZooKeeperRequest &)>;
void readImpl(ReadBuffer & in, RequestValidator request_validator);
std::string toStringImpl(bool short_format) const override;
ZooKeeperResponsePtr makeResponse() const override;

View File

@ -514,7 +514,13 @@ void KeeperContext::initializeFeatureFlags(const Poco::Util::AbstractConfigurati
feature_flags.disableFeatureFlag(feature_flag.value());
}
if (feature_flags.isEnabled(KeeperFeatureFlag::MULTI_READ))
feature_flags.enableFeatureFlag(KeeperFeatureFlag::FILTERED_LIST);
else
system_nodes_with_data[keeper_api_version_path] = toString(static_cast<uint8_t>(KeeperApiVersion::ZOOKEEPER_COMPATIBLE));
system_nodes_with_data[keeper_api_feature_flags_path] = feature_flags.getFeatureFlags();
}
feature_flags.logFlags(getLogger("KeeperContext"));
@ -569,6 +575,25 @@ const CoordinationSettings & KeeperContext::getCoordinationSettings() const
return *coordination_settings;
}
bool KeeperContext::isOperationSupported(Coordination::OpNum operation) const
{
switch (operation)
{
case Coordination::OpNum::FilteredList:
return feature_flags.isEnabled(KeeperFeatureFlag::FILTERED_LIST);
case Coordination::OpNum::MultiRead:
return feature_flags.isEnabled(KeeperFeatureFlag::MULTI_READ);
case Coordination::OpNum::CreateIfNotExists:
return feature_flags.isEnabled(KeeperFeatureFlag::CREATE_IF_NOT_EXISTS);
case Coordination::OpNum::CheckNotExists:
return feature_flags.isEnabled(KeeperFeatureFlag::CHECK_NOT_EXISTS);
case Coordination::OpNum::RemoveRecursive:
return feature_flags.isEnabled(KeeperFeatureFlag::REMOVE_RECURSIVE);
default:
return true;
}
}
uint64_t KeeperContext::lastCommittedIndex() const
{
return last_committed_log_idx.load(std::memory_order_relaxed);

View File

@ -1,6 +1,7 @@
#pragma once
#include <Coordination/KeeperFeatureFlags.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Common/ZooKeeper/ZooKeeperConstants.h>
#include <atomic>
#include <condition_variable>
#include <cstdint>
@ -103,6 +104,7 @@ public:
return precommit_sleep_probability_for_testing;
}
bool isOperationSupported(Coordination::OpNum operation) const;
private:
/// local disk defined using path or disk name
using Storage = std::variant<DiskPtr, std::string>;

View File

@ -1,5 +1,4 @@
#include <Server/KeeperTCPHandler.h>
#include "Common/ZooKeeper/ZooKeeperConstants.h"
#if USE_NURAFT
@ -19,6 +18,8 @@
# include <Common/NetException.h>
# include <Common/PipeFDs.h>
# include <Common/Stopwatch.h>
# include <Common/ZooKeeper/ZooKeeperCommon.h>
# include <Common/ZooKeeper/ZooKeeperConstants.h>
# include <Common/ZooKeeper/ZooKeeperIO.h>
# include <Common/logger_useful.h>
# include <Common/setThreadName.h>
@ -63,6 +64,7 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
extern const int UNEXPECTED_PACKET_FROM_CLIENT;
extern const int TIMEOUT_EXCEEDED;
extern const int BAD_ARGUMENTS;
}
struct PollResult
@ -637,7 +639,23 @@ std::pair<Coordination::OpNum, Coordination::XID> KeeperTCPHandler::receiveReque
Coordination::ZooKeeperRequestPtr request = Coordination::ZooKeeperRequestFactory::instance().get(opnum);
request->xid = xid;
request->readImpl(read_buffer);
auto request_validator = [&](const Coordination::ZooKeeperRequest & current_request)
{
if (!keeper_dispatcher->getKeeperContext()->isOperationSupported(current_request.getOpNum()))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unsupported operation: {}", current_request.getOpNum());
};
if (auto * multi_request = dynamic_cast<Coordination::ZooKeeperMultiRequest *>(request.get()))
{
multi_request->readImpl(read_buffer, request_validator);
}
else
{
request->readImpl(read_buffer);
request_validator(*request);
}
if (!keeper_dispatcher->putRequest(request, session_id, use_xid_64))
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Session {} already disconnected", session_id);

View File

@ -5,7 +5,7 @@
#include <Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.h>
#include <Storages/ObjectStorage/DataLakes/HudiMetadata.h>
#include <Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h>
#include <Storages/ObjectStorage/DataLakes/IcebergMetadata.h>
#include <Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h>
#include <Storages/ObjectStorage/HDFS/Configuration.h>
#include <Storages/ObjectStorage/Local/Configuration.h>
#include <Storages/ObjectStorage/S3/Configuration.h>
@ -56,6 +56,14 @@ public:
return std::nullopt;
}
void implementPartitionPruning(const ActionsDAG & filter_dag) override
{
if (!current_metadata || !current_metadata->supportsPartitionPruning())
return;
BaseStorageConfiguration::setPaths(current_metadata->makePartitionPruning(filter_dag));
}
private:
DataLakeMetadataPtr current_metadata;

View File

@ -2,6 +2,7 @@
#include <boost/noncopyable.hpp>
#include <Core/Types.h>
#include <Core/NamesAndTypes.h>
#include "Interpreters/ActionsDAG.h"
#include "PartitionColumns.h"
namespace DB
@ -16,6 +17,8 @@ public:
virtual bool operator==(const IDataLakeMetadata & other) const = 0;
virtual const DataLakePartitionColumns & getPartitionColumns() const = 0;
virtual const std::unordered_map<String, String> & getColumnNameToPhysicalNameMapping() const = 0;
virtual bool supportsPartitionPruning() { return false; }
virtual Strings makePartitionPruning(const ActionsDAG &) { return {}; }
};
using DataLakeMetadataPtr = std::unique_ptr<IDataLakeMetadata>;

View File

@ -1,44 +1,46 @@
#include "config.h"
#if USE_AVRO
#include <Common/logger_useful.h>
#include <Core/Settings.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnTuple.h>
#include <Columns/IColumn.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime64.h>
#include <DataTypes/DataTypeFactory.h>
#include <DataTypes/DataTypeFixedString.h>
#include <DataTypes/DataTypeMap.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypeUUID.h>
#include <DataTypes/DataTypesDecimal.h>
#include <DataTypes/DataTypesNumber.h>
#include <Formats/FormatFactory.h>
#include <IO/ReadBufferFromString.h>
#include <IO/ReadBufferFromFileBase.h>
#include <IO/ReadHelpers.h>
#include <Processors/Formats/Impl/AvroRowInputFormat.h>
#include <Storages/ObjectStorage/DataLakes/IcebergMetadata.h>
#include <Storages/ObjectStorage/DataLakes/Common.h>
#include <Storages/ObjectStorage/StorageObjectStorageSource.h>
# include <Columns/ColumnString.h>
# include <Columns/ColumnTuple.h>
# include <Columns/IColumn.h>
# include <Core/Settings.h>
# include <DataTypes/DataTypeArray.h>
# include <DataTypes/DataTypeDate.h>
# include <DataTypes/DataTypeDateTime64.h>
# include <DataTypes/DataTypeFactory.h>
# include <DataTypes/DataTypeFixedString.h>
# include <DataTypes/DataTypeMap.h>
# include <DataTypes/DataTypeNullable.h>
# include <DataTypes/DataTypeString.h>
# include <DataTypes/DataTypeTuple.h>
# include <DataTypes/DataTypeUUID.h>
# include <DataTypes/DataTypesDecimal.h>
# include <DataTypes/DataTypesNumber.h>
# include <Formats/FormatFactory.h>
# include <IO/ReadBufferFromFileBase.h>
# include <IO/ReadBufferFromString.h>
# include <IO/ReadHelpers.h>
# include <Processors/Formats/Impl/AvroRowInputFormat.h>
# include <Storages/ObjectStorage/DataLakes/Common.h>
# include <Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h>
# include <Storages/ObjectStorage/StorageObjectStorageSource.h>
# include <Common/logger_useful.h>
#include <Poco/JSON/Array.h>
#include <Poco/JSON/Object.h>
#include <Poco/JSON/Parser.h>
#include <filesystem>
# include <Poco/JSON/Array.h>
# include <Poco/JSON/Object.h>
# include <Poco/JSON/Parser.h>
# include <filesystem>
namespace DB
{
namespace Setting
{
extern const SettingsBool iceberg_engine_ignore_schema_evolution;
extern const SettingsBool iceberg_engine_ignore_schema_evolution;
}
namespace ErrorCodes
@ -73,20 +75,6 @@ IcebergMetadata::IcebergMetadata(
namespace
{
enum class ManifestEntryStatus : uint8_t
{
EXISTING = 0,
ADDED = 1,
DELETED = 2,
};
enum class DataFileContent : uint8_t
{
DATA = 0,
POSITION_DELETES = 1,
EQUALITY_DELETES = 2,
};
/**
* Iceberg supports the next data types (see https://iceberg.apache.org/spec/#schemas-and-data-types):
* - Primitive types:
@ -245,10 +233,10 @@ DataTypePtr getFieldType(const Poco::JSON::Object::Ptr & field, const String & t
}
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unexpected 'type' field: {}", type.toString());
}
std::pair<NamesAndTypesList, Int32> parseTableSchema(const Poco::JSON::Object::Ptr & metadata_object, int format_version, bool ignore_schema_evolution)
std::tuple<NamesAndTypesList, Int32, std::unordered_map<Int32, NameAndTypePair>>
parseTableSchema(const Poco::JSON::Object::Ptr & metadata_object, int format_version, bool ignore_schema_evolution)
{
Poco::JSON::Object::Ptr schema;
Int32 current_schema_id;
@ -279,20 +267,24 @@ std::pair<NamesAndTypesList, Int32> parseTableSchema(const Poco::JSON::Object::P
}
if (!schema)
throw Exception(ErrorCodes::BAD_ARGUMENTS, R"(There is no schema with "schema-id" that matches "current-schema-id" in metadata)");
throw Exception(
ErrorCodes::BAD_ARGUMENTS, R"(There is no schema with "schema-id" that matches "current-schema-id" in metadata)");
}
else
{
if (schemas->size() != 1)
throw Exception(ErrorCodes::UNSUPPORTED_METHOD,
throw Exception(
ErrorCodes::UNSUPPORTED_METHOD,
"Cannot read Iceberg table: the table schema has been changed at least 1 time, reading tables with evolved schema is "
"supported. If you want to ignore schema evolution and read all files using latest schema saved on table creation, enable setting "
"supported. If you want to ignore schema evolution and read all files using latest schema saved on table creation, "
"enable setting "
"iceberg_engine_ignore_schema_evolution (Note: enabling this setting can lead to incorrect result)");
/// Now we sure that there is only one schema.
schema = schemas->getObject(0);
if (schema->getValue<int>("schema-id") != current_schema_id)
throw Exception(ErrorCodes::BAD_ARGUMENTS, R"(Field "schema-id" of the schema doesn't match "current-schema-id" in metadata)");
throw Exception(
ErrorCodes::BAD_ARGUMENTS, R"(Field "schema-id" of the schema doesn't match "current-schema-id" in metadata)");
}
}
else
@ -301,30 +293,32 @@ std::pair<NamesAndTypesList, Int32> parseTableSchema(const Poco::JSON::Object::P
current_schema_id = schema->getValue<int>("schema-id");
/// Field "schemas" is optional for version 1, but after version 2 was introduced,
/// in most cases this field is added for new tables in version 1 as well.
if (!ignore_schema_evolution && metadata_object->has("schemas") && metadata_object->get("schemas").extract<Poco::JSON::Array::Ptr>()->size() > 1)
throw Exception(ErrorCodes::UNSUPPORTED_METHOD,
if (!ignore_schema_evolution && metadata_object->has("schemas")
&& metadata_object->get("schemas").extract<Poco::JSON::Array::Ptr>()->size() > 1)
throw Exception(
ErrorCodes::UNSUPPORTED_METHOD,
"Cannot read Iceberg table: the table schema has been changed at least 1 time, reading tables with evolved schema is not "
"supported. If you want to ignore schema evolution and read all files using latest schema saved on table creation, enable setting "
"supported. If you want to ignore schema evolution and read all files using latest schema saved on table creation, enable "
"setting "
"iceberg_engine_ignore_schema_evolution (Note: enabling this setting can lead to incorrect result)");
}
NamesAndTypesList names_and_types;
auto fields = schema->get("fields").extract<Poco::JSON::Array::Ptr>();
std::unordered_map<Int32, NameAndTypePair> name_and_type_by_source_id;
for (size_t i = 0; i != fields->size(); ++i)
{
auto field = fields->getObject(static_cast<UInt32>(i));
auto name = field->getValue<String>("name");
bool required = field->getValue<bool>("required");
names_and_types.push_back({name, getFieldType(field, "type", required)});
name_and_type_by_source_id[field->getValue<int>("id")] = {name, names_and_types.back().type};
}
return {std::move(names_and_types), current_schema_id};
return {std::move(names_and_types), current_schema_id, name_and_type_by_source_id};
}
MutableColumns parseAvro(
avro::DataFileReaderBase & file_reader,
const Block & header,
const FormatSettings & settings)
MutableColumns parseAvro(avro::DataFileReaderBase & file_reader, const Block & header, const FormatSettings & settings)
{
auto deserializer = std::make_unique<AvroDeserializer>(header, file_reader.dataSchema(), true, true, settings);
MutableColumns columns = header.cloneEmptyColumns();
@ -345,17 +339,14 @@ MutableColumns parseAvro(
* 1) v<V>.metadata.json, where V - metadata version.
* 2) <V>-<random-uuid>.metadata.json, where V - metadata version
*/
std::pair<Int32, String> getMetadataFileAndVersion(
ObjectStoragePtr object_storage,
const StorageObjectStorage::Configuration & configuration)
std::pair<Int32, String>
getMetadataFileAndVersion(ObjectStoragePtr object_storage, const StorageObjectStorage::Configuration & configuration)
{
const auto metadata_files = listFiles(*object_storage, configuration, "metadata", ".metadata.json");
if (metadata_files.empty())
{
throw Exception(
ErrorCodes::FILE_DOESNT_EXIST,
"The metadata file for Iceberg table with path {} doesn't exist",
configuration.getPath());
ErrorCodes::FILE_DOESNT_EXIST, "The metadata file for Iceberg table with path {} doesn't exist", configuration.getPath());
}
std::vector<std::pair<UInt32, String>> metadata_files_with_versions;
@ -372,7 +363,8 @@ std::pair<Int32, String> getMetadataFileAndVersion(
version_str = String(file_name.begin(), file_name.begin() + file_name.find_first_of('-'));
if (!std::all_of(version_str.begin(), version_str.end(), isdigit))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Bad metadata file name: {}. Expected vN.metadata.json where N is a number", file_name);
throw Exception(
ErrorCodes::BAD_ARGUMENTS, "Bad metadata file name: {}. Expected vN.metadata.json where N is a number", file_name);
metadata_files_with_versions.emplace_back(std::stoi(version_str), path);
}
@ -403,7 +395,7 @@ IcebergMetadata::create(ObjectStoragePtr object_storage, ConfigurationObserverPt
const Poco::JSON::Object::Ptr & object = json.extract<Poco::JSON::Object::Ptr>();
auto format_version = object->getValue<int>("format-version");
auto [schema, schema_id]
auto [schema, schema_id, name_and_type_by_source_id]
= parseTableSchema(object, format_version, local_context->getSettingsRef()[Setting::iceberg_engine_ignore_schema_evolution]);
auto current_snapshot_id = object->getValue<Int64>("current-snapshot-id");
@ -456,7 +448,6 @@ Strings IcebergMetadata::getDataFiles() const
if (!data_files.empty())
return data_files;
Strings manifest_files;
if (manifest_list_file.empty())
return data_files;
@ -465,7 +456,8 @@ Strings IcebergMetadata::getDataFiles() const
auto context = getContext();
StorageObjectStorageSource::ObjectInfo object_info(manifest_list_file);
auto manifest_list_buf = StorageObjectStorageSource::createReadBuffer(object_info, object_storage, context, log);
auto manifest_list_file_reader = std::make_unique<avro::DataFileReaderBase>(std::make_unique<AvroInputStreamReadBufferAdapter>(*manifest_list_buf));
auto manifest_list_file_reader
= std::make_unique<avro::DataFileReaderBase>(std::make_unique<AvroInputStreamReadBufferAdapter>(*manifest_list_buf));
auto data_type = AvroSchemaReader::avroNodeToDataType(manifest_list_file_reader->dataSchema().root()->leafAt(0));
Block header{{data_type->createColumn(), data_type, "manifest_path"}};
@ -511,7 +503,8 @@ Strings IcebergMetadata::getDataFiles() const
throw Exception(
ErrorCodes::UNSUPPORTED_METHOD,
"Cannot read Iceberg table: the table schema has been changed at least 1 time, reading tables with evolved schema is not "
"supported. If you want to ignore schema evolution and read all files using latest schema saved on table creation, enable setting "
"supported. If you want to ignore schema evolution and read all files using latest schema saved on table creation, enable "
"setting "
"iceberg_engine_ignore_schema_evolution (Note: enabling this setting can lead to incorrect result)");
avro::NodePtr root_node = manifest_file_reader->dataSchema().root();
@ -520,9 +513,7 @@ Strings IcebergMetadata::getDataFiles() const
if (leaves_num < expected_min_num)
{
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Unexpected number of columns {}. Expected at least {}",
root_node->leaves(), expected_min_num);
ErrorCodes::BAD_ARGUMENTS, "Unexpected number of columns {}. Expected at least {}", root_node->leaves(), expected_min_num);
}
avro::NodePtr status_node = root_node->leafAt(0);
@ -615,7 +606,8 @@ Strings IcebergMetadata::getDataFiles() const
{
Int32 content_type = content_int_column->getElement(i);
if (DataFileContent(content_type) != DataFileContent::DATA)
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Cannot read Iceberg table: positional and equality deletes are not supported");
throw Exception(
ErrorCodes::UNSUPPORTED_METHOD, "Cannot read Iceberg table: positional and equality deletes are not supported");
}
const auto status = status_int_column->getInt(i);
@ -635,14 +627,31 @@ Strings IcebergMetadata::getDataFiles() const
{
LOG_TEST(log, "Processing data file for path: {}", file_path);
files.insert(file_path);
data_files.push_back(file_path);
}
}
ColumnPtr big_partition_column = data_file_tuple_column->getColumnPtr(data_file_tuple_type.getPositionByName("partition"));
if (big_partition_column->getDataType() != TypeIndex::Tuple)
{
throw Exception(
ErrorCodes::ILLEGAL_COLUMN,
"The parsed column from Avro file of `file_path` field should be Tuple type, got {}",
columns.at(1)->getFamilyName());
}
const auto * big_partition_tuple = assert_cast<const ColumnTuple *>(big_partition_column.get());
std::vector<uint8_t> partition_spec_json_bytes = avro_metadata["partition-spec"];
String partition_spec_json_string
= String(reinterpret_cast<char *>(partition_spec_json_bytes.data()), partition_spec_json_bytes.size());
Poco::Dynamic::Var partition_spec_json = parser.parse(partition_spec_json_string);
const Poco::JSON::Array::Ptr & partition_spec = partition_spec_json.extract<Poco::JSON::Array::Ptr>();
common_partition_infos.push_back(pruning_processor.getCommonPartitionInfo(partition_spec, big_partition_tuple));
specific_partition_infos.push_back(
pruning_processor.getSpecificPartitionInfo(common_partition_infos.back(), current_schema_id, name_and_type_by_source_id));
}
data_files = std::vector<std::string>(files.begin(), files.end());
return data_files;
return pruning_processor.getDataFiles(
common_partition_infos, specific_partition_infos, nullptr, getContext(), configuration_ptr->getPath());
}
}
#endif

View File

@ -0,0 +1,307 @@
#pragma once
#include "Columns/IColumn.h"
#include "Core/NamesAndTypes.h"
#include "DataTypes/DataTypeNullable.h"
#include "config.h"
#if USE_AVRO /// StorageIceberg depending on Avro to parse metadata with Avro format.
# include <Core/Types.h>
# include <Disks/ObjectStorages/IObjectStorage.h>
# include <Interpreters/Context_fwd.h>
# include <Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h>
# include <Storages/ObjectStorage/StorageObjectStorage.h>
namespace DB
{
/**
* Useful links:
* - https://iceberg.apache.org/spec/
*
* Iceberg has two format versions, v1 and v2. The content of metadata files depends on the version.
*
* Unlike DeltaLake, Iceberg has several metadata layers: `table metadata`, `manifest list` and `manifest_files`.
* Metadata file - json file.
* Manifest list an Avro file that lists manifest files; one per snapshot.
* Manifest file an Avro file that lists data or delete files; a subset of a snapshot.
* All changes to table state create a new metadata file and replace the old metadata with an atomic swap.
*
* In order to find out which data files to read, we need to find the `manifest list`
* which corresponds to the latest snapshot. We find it by checking a list of snapshots
* in metadata's "snapshots" section.
*
* Example of metadata.json file.
* {
* "format-version" : 1,
* "table-uuid" : "ca2965ad-aae2-4813-8cf7-2c394e0c10f5",
* "location" : "/iceberg_data/db/table_name",
* "last-updated-ms" : 1680206743150,
* "last-column-id" : 2,
* "schema" : { "type" : "struct", "schema-id" : 0, "fields" : [ {<field1_info>}, {<field2_info>}, ... ] },
* "current-schema-id" : 0,
* "schemas" : [ ],
* ...
* "current-snapshot-id" : 2819310504515118887,
* "refs" : { "main" : { "snapshot-id" : 2819310504515118887, "type" : "branch" } },
* "snapshots" : [ {
* "snapshot-id" : 2819310504515118887,
* "timestamp-ms" : 1680206743150,
* "summary" : {
* "operation" : "append", "spark.app.id" : "local-1680206733239",
* "added-data-files" : "1", "added-records" : "100",
* "added-files-size" : "1070", "changed-partition-count" : "1",
* "total-records" : "100", "total-files-size" : "1070", "total-data-files" : "1", "total-delete-files" : "0",
* "total-position-deletes" : "0", "total-equality-deletes" : "0"
* },
* "manifest-list" : "/iceberg_data/db/table_name/metadata/snap-2819310504515118887-1-c87bfec7-d36c-4075-ad04-600b6b0f2020.avro",
* "schema-id" : 0
* } ],
* "statistics" : [ ],
* "snapshot-log" : [ ... ],
* "metadata-log" : [ ]
* }
*/
enum class PartitionTransform
{
Year,
Month,
Day,
Hour,
Unsupported
};
enum class ManifestEntryStatus : uint8_t
{
EXISTING = 0,
ADDED = 1,
DELETED = 2,
};
enum class DataFileContent : uint8_t
{
DATA = 0,
POSITION_DELETES = 1,
EQUALITY_DELETES = 2,
};
struct CommonPartitionInfo
{
std::vector<ColumnPtr> partition_columns;
std::vector<PartitionTransform> partition_transforms;
std::vector<Int32> partition_source_ids;
ColumnPtr file_path_column;
ColumnPtr status_column;
};
struct SpecificSchemaPartitionInfo
{
std::vector<std::vector<Range>> ranges;
NamesAndTypesList partition_names_and_types;
};
class PartitionPruningProcessor
{
public:
CommonPartitionInfo
getCommonPartitionInfo(const Poco::JSON::Array::Ptr & partition_specification, const ColumnTuple * big_partition_tuple) const;
SpecificSchemaPartitionInfo getSpecificPartitionInfo(
const CommonPartitionInfo & common_info,
Int32 schema_version,
const std::unordered_map<Int32, NameAndTypePair> & name_and_type_by_source_id) const;
std::vector<bool>
getPruningMask(const SpecificSchemaPartitionInfo & specific_info, const ActionsDAG * filter_dag, ContextPtr context) const;
Strings getDataFiles(
const std::vector<CommonPartitionInfo> & manifest_partitions_infos,
const std::vector<SpecificSchemaPartitionInfo> & specific_infos,
const ActionsDAG * filter_dag,
ContextPtr context,
const std::string & common_path) const;
private:
static PartitionTransform getTransform(const String & transform_name)
{
if (transform_name == "year")
{
return PartitionTransform::Year;
}
else if (transform_name == "month")
{
return PartitionTransform::Month;
}
else if (transform_name == "day")
{
return PartitionTransform::Day;
}
else
{
return PartitionTransform::Unsupported;
}
}
static DateLUTImpl::Values getValues(Int32 value, PartitionTransform transform)
{
if (transform == PartitionTransform::Year)
{
return DateLUT::instance().lutIndexByYearSinceEpochStartsZeroIndexing(value);
}
else if (transform == PartitionTransform::Month)
{
return DateLUT::instance().lutIndexByMonthSinceEpochStartsZeroIndexing(static_cast<UInt32>(value));
}
else if (transform == PartitionTransform::Day)
{
return DateLUT::instance().getValues(static_cast<UInt16>(value));
}
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unsupported partition transform for get day function: {}", transform);
}
static Int64 getTime(Int32 value, PartitionTransform transform)
{
DateLUTImpl::Values values = getValues(value, transform);
// LOG_DEBUG(&Poco::Logger::get("Get field"), "Values: {}", values);
return values.date;
}
static Int16 getDay(Int32 value, PartitionTransform transform)
{
DateLUTImpl::Time got_time = getTime(value, transform);
// LOG_DEBUG(&Poco::Logger::get("Get field"), "Time: {}", got_time);
return DateLUT::instance().toDayNum(got_time);
}
static Range
getPartitionRange(PartitionTransform partition_transform, UInt32 index, ColumnPtr partition_column, DataTypePtr column_data_type)
{
if (partition_transform == PartitionTransform::Year || partition_transform == PartitionTransform::Month
|| partition_transform == PartitionTransform::Day)
{
auto column = dynamic_cast<const ColumnNullable *>(partition_column.get())->getNestedColumnPtr();
const auto * casted_innner_column = assert_cast<const ColumnInt32 *>(column.get());
Int32 value = static_cast<Int32>(casted_innner_column->getInt(index));
LOG_DEBUG(&Poco::Logger::get("Partition"), "Partition value: {}, transform: {}", value, partition_transform);
auto nested_data_type = dynamic_cast<const DataTypeNullable *>(column_data_type.get())->getNestedType();
if (WhichDataType(nested_data_type).isDate())
{
const UInt16 begin_range_value = getDay(value, partition_transform);
const UInt16 end_range_value = getDay(value + 1, partition_transform);
return Range{begin_range_value, true, end_range_value, false};
}
else if (WhichDataType(nested_data_type).isDateTime64())
{
const UInt64 begin_range_value = getTime(value, partition_transform);
const UInt64 end_range_value = getTime(value + 1, partition_transform);
return Range{begin_range_value, true, end_range_value, false};
}
else
{
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Partition transform {} is not supported for the type: {}",
partition_transform,
nested_data_type);
}
}
else
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unsupported partition transform: {}", partition_transform);
}
}
};
class IcebergMetadata : public IDataLakeMetadata, private WithContext
{
public:
using ConfigurationObserverPtr = StorageObjectStorage::ConfigurationObserverPtr;
static constexpr auto name = "Iceberg";
enum class PartitionTransform
{
Year,
Month,
Day,
Hour,
Unsupported
};
IcebergMetadata(
ObjectStoragePtr object_storage_,
ConfigurationObserverPtr configuration_,
ContextPtr context_,
Int32 metadata_version_,
Int32 format_version_,
String manifest_list_file_,
Int32 current_schema_id_,
NamesAndTypesList schema_);
/// Get data files. On first request it reads manifest_list file and iterates through manifest files to find all data files.
/// All subsequent calls will return saved list of files (because it cannot be changed without changing metadata file)
Strings getDataFiles() const override;
/// Get table schema parsed from metadata.
NamesAndTypesList getTableSchema() const override { return schema; }
const std::unordered_map<String, String> & getColumnNameToPhysicalNameMapping() const override { return column_name_to_physical_name; }
const DataLakePartitionColumns & getPartitionColumns() const override { return partition_columns; }
bool operator==(const IDataLakeMetadata & other) const override
{
const auto * iceberg_metadata = dynamic_cast<const IcebergMetadata *>(&other);
return iceberg_metadata && getVersion() == iceberg_metadata->getVersion();
}
static DataLakeMetadataPtr create(ObjectStoragePtr object_storage, ConfigurationObserverPtr configuration, ContextPtr local_context);
Strings makePartitionPruning(const ActionsDAG & filter_dag) override
{
auto configuration_ptr = configuration.lock();
if (!configuration_ptr)
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Configuration is expired");
}
return pruning_processor.getDataFiles(
common_partition_infos, specific_partition_infos, &filter_dag, getContext(), configuration_ptr->getPath());
}
bool supportsPartitionPruning() override { return true; }
private:
size_t getVersion() const { return metadata_version; }
const ObjectStoragePtr object_storage;
const ConfigurationObserverPtr configuration;
Int32 metadata_version;
Int32 format_version;
String manifest_list_file;
Int32 current_schema_id;
NamesAndTypesList schema;
std::unordered_map<String, String> column_name_to_physical_name;
DataLakePartitionColumns partition_columns;
LoggerPtr log;
std::unordered_map<Int32, NameAndTypePair> name_and_type_by_source_id;
std::vector<std::pair<String, Int32>> manifest_files_with_start_index;
mutable Strings data_files;
std::vector<CommonPartitionInfo> partition_infos;
mutable Strings manifest_files;
PartitionPruningProcessor pruning_processor;
mutable std::vector<CommonPartitionInfo> common_partition_infos;
mutable std::vector<SpecificSchemaPartitionInfo> specific_partition_infos;
};
}
#endif

View File

@ -0,0 +1,177 @@
#include <exception>
#include "Common/DateLUT.h"
#include "Core/NamesAndTypes.h"
#include "config.h"
#if USE_AVRO
# include <Columns/ColumnString.h>
# include <Columns/ColumnTuple.h>
# include <Columns/IColumn.h>
# include <Core/Settings.h>
# include <DataTypes/DataTypeArray.h>
# include <DataTypes/DataTypeDate.h>
# include <DataTypes/DataTypeDateTime64.h>
# include <DataTypes/DataTypeFactory.h>
# include <DataTypes/DataTypeFixedString.h>
# include <DataTypes/DataTypeMap.h>
# include <DataTypes/DataTypeNullable.h>
# include <DataTypes/DataTypeString.h>
# include <DataTypes/DataTypeTuple.h>
# include <DataTypes/DataTypeUUID.h>
# include <DataTypes/DataTypesDecimal.h>
# include <DataTypes/DataTypesNumber.h>
# include <Formats/FormatFactory.h>
# include <IO/ReadBufferFromFileBase.h>
# include <IO/ReadBufferFromString.h>
# include <IO/ReadHelpers.h>
# include <Processors/Formats/Impl/AvroRowInputFormat.h>
# include <Storages/ObjectStorage/DataLakes/Common.h>
# include <Storages/ObjectStorage/StorageObjectStorageSource.h>
# include <Common/logger_useful.h>
# include <Poco/JSON/Array.h>
# include <Poco/JSON/Object.h>
# include <Poco/JSON/Parser.h>
# include <Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h>
# include <DataFile.hh>
namespace DB
{
CommonPartitionInfo PartitionPruningProcessor::getCommonPartitionInfo(
const Poco::JSON::Array::Ptr & partition_specification, const ColumnTuple * data_file_tuple_column) const
{
CommonPartitionInfo common_info;
ColumnPtr big_partition_column = data_file_tuple_column->getColumnPtr(data_file_tuple_type.getPositionByName("partition"));
common_info.file_path_column = big_partition_column->getColumnPtr(0);
for (size_t i = 0; i != partition_specification->size(); ++i)
{
auto current_field = partition_specification->getObject(static_cast<UInt32>(i));
auto source_id = current_field->getValue<Int32>("source-id");
PartitionTransform transform = getTransform(current_field->getValue<String>("transform"));
if (transform == PartitionTransform::Unsupported)
{
continue;
}
auto partition_name = current_field->getValue<String>("name");
LOG_DEBUG(&Poco::Logger::get("Partition Spec"), "Name: {}", partition_name);
common_info.partition_columns.push_back(big_partition_column->getColumnPtr(i));
common_info.partition_transforms.push_back(transform);
common_info.partition_source_ids.push_back(source_id);
}
return common_info;
}
SpecificSchemaPartitionInfo PartitionPruningProcessor::getSpecificPartitionInfo(
const CommonPartitionInfo & common_info,
[[maybe_unused]] Int32 schema_version,
const std::unordered_map<Int32, NameAndTypePair> & name_and_type_by_source_id) const
{
SpecificSchemaPartitionInfo specific_info;
for (size_t i = 0; i < common_info.partition_columns.size(); ++i)
{
Int32 source_id = common_info.partition_source_ids[i];
auto it = name_and_type_by_source_id.find(source_id);
if (it == name_and_type_by_source_id.end())
{
continue;
}
size_t column_size = common_info.partition_columns[i]->size();
if (specific_info.ranges.empty())
{
specific_info.ranges.resize(column_size);
}
else
{
assert(specific_info.ranges.size() == column_size);
}
NameAndTypePair name_and_type = it->second;
specific_info.partition_names_and_types.push_back(name_and_type);
for (size_t j = 0; j < column_size; ++j)
{
specific_info.ranges[j].push_back(getPartitionRange(
common_info.partition_transforms[i], static_cast<UInt32>(j), common_info.partition_columns[i], name_and_type.type));
}
}
return specific_info;
}
std::vector<bool> PartitionPruningProcessor::getPruningMask(
const SpecificSchemaPartitionInfo & specific_info, const ActionsDAG * filter_dag, ContextPtr context) const
{
std::vector<bool> pruning_mask;
if (!specific_info.partition_names_and_types.empty())
{
ExpressionActionsPtr partition_minmax_idx_expr = std::make_shared<ExpressionActions>(
ActionsDAG(specific_info.partition_names_and_types), ExpressionActionsSettings::fromContext(context));
const KeyCondition partition_key_condition(
filter_dag, context, specific_info.partition_names_and_types.getNames(), partition_minmax_idx_expr);
for (size_t j = 0; j < specific_info.ranges.size(); ++j)
{
if (!partition_key_condition.checkInHyperrectangle(specific_info.ranges[j], specific_info.partition_names_and_types.getTypes())
.can_be_true)
{
LOG_DEBUG(&Poco::Logger::get("Partition pruning"), "Partition pruning was successful for file: {}", j);
pruning_mask.push_back(false);
}
else
{
LOG_DEBUG(&Poco::Logger::get("Partition pruning"), "Partition pruning failed for file: {}", j);
pruning_mask.push_back(true);
}
}
}
return pruning_mask;
}
Strings PartitionPruningProcessor::getDataFiles(
const std::vector<CommonPartitionInfo> & manifest_partitions_infos,
const std::vector<SpecificSchemaPartitionInfo> & specific_infos,
const ActionsDAG * filter_dag,
ContextPtr context,
const std::string & common_path) const
{
Strings data_files;
for (size_t index = 0; index < manifest_partitions_infos.size(); ++index)
{
const auto & manifest_partition_info = manifest_partitions_infos[index];
const auto & specific_partition_info = specific_infos[index];
size_t number_of_files_in_manifest = manifest_partition_info.file_path_column->size();
LOG_DEBUG(&Poco::Logger::get("Partition pruning"), "Filter dag is null: {}", filter_dag == nullptr);
auto pruning_mask = filter_dag ? getPruningMask(specific_partition_info, filter_dag, context) : std::vector<bool>{};
for (size_t i = 0; i < number_of_files_in_manifest; ++i)
{
if (!filter_dag || pruning_mask[i])
{
const auto status = manifest_partition_info.status_column->getInt(i);
const auto data_path = std::string(manifest_partition_info.file_path_column->getDataAt(i).toView());
const auto pos = data_path.find(common_path);
if (pos == std::string::npos)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expected to find {} in data path: {}", common_path, data_path);
const auto file_path = data_path.substr(pos);
if (ManifestEntryStatus(status) == ManifestEntryStatus::DELETED)
{
throw Exception(
ErrorCodes::UNSUPPORTED_METHOD, "Cannot read Iceberg table: positional and equality deletes are not supported");
}
else
{
data_files.push_back(file_path);
}
}
}
}
return data_files;
}
}
#endif

View File

@ -1,117 +0,0 @@
#pragma once
#include "config.h"
#if USE_AVRO /// StorageIceberg depending on Avro to parse metadata with Avro format.
#include <Interpreters/Context_fwd.h>
#include <Core/Types.h>
#include <Disks/ObjectStorages/IObjectStorage.h>
#include <Storages/ObjectStorage/StorageObjectStorage.h>
#include <Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h>
namespace DB
{
/**
* Useful links:
* - https://iceberg.apache.org/spec/
*
* Iceberg has two format versions, v1 and v2. The content of metadata files depends on the version.
*
* Unlike DeltaLake, Iceberg has several metadata layers: `table metadata`, `manifest list` and `manifest_files`.
* Metadata file - json file.
* Manifest list an Avro file that lists manifest files; one per snapshot.
* Manifest file an Avro file that lists data or delete files; a subset of a snapshot.
* All changes to table state create a new metadata file and replace the old metadata with an atomic swap.
*
* In order to find out which data files to read, we need to find the `manifest list`
* which corresponds to the latest snapshot. We find it by checking a list of snapshots
* in metadata's "snapshots" section.
*
* Example of metadata.json file.
* {
* "format-version" : 1,
* "table-uuid" : "ca2965ad-aae2-4813-8cf7-2c394e0c10f5",
* "location" : "/iceberg_data/db/table_name",
* "last-updated-ms" : 1680206743150,
* "last-column-id" : 2,
* "schema" : { "type" : "struct", "schema-id" : 0, "fields" : [ {<field1_info>}, {<field2_info>}, ... ] },
* "current-schema-id" : 0,
* "schemas" : [ ],
* ...
* "current-snapshot-id" : 2819310504515118887,
* "refs" : { "main" : { "snapshot-id" : 2819310504515118887, "type" : "branch" } },
* "snapshots" : [ {
* "snapshot-id" : 2819310504515118887,
* "timestamp-ms" : 1680206743150,
* "summary" : {
* "operation" : "append", "spark.app.id" : "local-1680206733239",
* "added-data-files" : "1", "added-records" : "100",
* "added-files-size" : "1070", "changed-partition-count" : "1",
* "total-records" : "100", "total-files-size" : "1070", "total-data-files" : "1", "total-delete-files" : "0",
* "total-position-deletes" : "0", "total-equality-deletes" : "0"
* },
* "manifest-list" : "/iceberg_data/db/table_name/metadata/snap-2819310504515118887-1-c87bfec7-d36c-4075-ad04-600b6b0f2020.avro",
* "schema-id" : 0
* } ],
* "statistics" : [ ],
* "snapshot-log" : [ ... ],
* "metadata-log" : [ ]
* }
*/
class IcebergMetadata : public IDataLakeMetadata, private WithContext
{
public:
using ConfigurationObserverPtr = StorageObjectStorage::ConfigurationObserverPtr;
static constexpr auto name = "Iceberg";
IcebergMetadata(
ObjectStoragePtr object_storage_,
ConfigurationObserverPtr configuration_,
ContextPtr context_,
Int32 metadata_version_,
Int32 format_version_,
String manifest_list_file_,
Int32 current_schema_id_,
NamesAndTypesList schema_);
/// Get data files. On first request it reads manifest_list file and iterates through manifest files to find all data files.
/// All subsequent calls will return saved list of files (because it cannot be changed without changing metadata file)
Strings getDataFiles() const override;
/// Get table schema parsed from metadata.
NamesAndTypesList getTableSchema() const override { return schema; }
const std::unordered_map<String, String> & getColumnNameToPhysicalNameMapping() const override { return column_name_to_physical_name; }
const DataLakePartitionColumns & getPartitionColumns() const override { return partition_columns; }
bool operator ==(const IDataLakeMetadata & other) const override
{
const auto * iceberg_metadata = dynamic_cast<const IcebergMetadata *>(&other);
return iceberg_metadata && getVersion() == iceberg_metadata->getVersion();
}
static DataLakeMetadataPtr create(ObjectStoragePtr object_storage, ConfigurationObserverPtr configuration, ContextPtr local_context);
private:
size_t getVersion() const { return metadata_version; }
const ObjectStoragePtr object_storage;
const ConfigurationObserverPtr configuration;
Int32 metadata_version;
Int32 format_version;
String manifest_list_file;
Int32 current_schema_id;
NamesAndTypesList schema;
mutable Strings data_files;
std::unordered_map<String, String> column_name_to_physical_name;
DataLakePartitionColumns partition_columns;
LoggerPtr log;
};
}
#endif

View File

@ -195,7 +195,10 @@ public:
SourceStepWithFilter::applyFilters(std::move(added_filter_nodes));
const ActionsDAG::Node * predicate = nullptr;
if (filter_actions_dag)
{
configuration->implementPartitionPruning(*filter_actions_dag);
predicate = filter_actions_dag->getOutputs().at(0);
}
createIterator(predicate);
}

View File

@ -204,6 +204,8 @@ public:
virtual bool isDataLakeConfiguration() const { return false; }
virtual void implementPartitionPruning(const ActionsDAG &) { }
virtual ReadFromFormatInfo prepareReadingFromFormat(
ObjectStoragePtr object_storage,
const Strings & requested_columns,

View File

@ -92,6 +92,10 @@ StorageObjectStorageSource::~StorageObjectStorageSource()
void StorageObjectStorageSource::setKeyCondition(const std::optional<ActionsDAG> & filter_actions_dag, ContextPtr context_)
{
setKeyConditionImpl(filter_actions_dag, context_, read_from_format_info.format_header);
if (filter_actions_dag.has_value())
{
configuration->implementPartitionPruning(filter_actions_dag.value());
}
}
std::string StorageObjectStorageSource::getUniqueStoragePathIdentifier(

View File

@ -7,6 +7,7 @@ import os.path as p
import platform
import pprint
import pwd
import random
import re
import shlex
import shutil
@ -1650,6 +1651,8 @@ class ClickHouseCluster:
minio_certs_dir=None,
minio_data_dir=None,
use_keeper=True,
keeper_randomize_feature_flags=True,
keeper_required_feature_flags=[],
main_config_name="config.xml",
users_config_name="users.xml",
copy_common_configs=True,
@ -1682,6 +1685,8 @@ class ClickHouseCluster:
if not env_variables:
env_variables = {}
self.use_keeper = use_keeper
self.keeper_randomize_feature_flags = keeper_randomize_feature_flags
self.keeper_required_feature_flags = keeper_required_feature_flags
# Code coverage files will be placed in database directory
# (affect only WITH_COVERAGE=1 build)
@ -2828,15 +2833,51 @@ class ClickHouseCluster:
if self.use_keeper: # TODO: remove hardcoded paths from here
for i in range(1, 4):
current_keeper_config_dir = os.path.join(
f"{self.keeper_instance_dir_prefix}{i}", "config"
)
shutil.copy(
os.path.join(
self.keeper_config_dir, f"keeper_config{i}.xml"
),
os.path.join(
self.keeper_instance_dir_prefix + f"{i}", "config"
),
current_keeper_config_dir,
)
extra_configs_dir = os.path.join(
current_keeper_config_dir, f"keeper_config{i}.d"
)
os.mkdir(extra_configs_dir)
feature_flags_config = os.path.join(
extra_configs_dir, "feature_flags.yaml"
)
indentation = 4 * " "
def get_feature_flag_value(feature_flag):
if not self.keeper_randomize_feature_flags:
return 1
if feature_flag in self.keeper_required_feature_flags:
return 1
return random.randint(0, 1)
with open(feature_flags_config, "w") as ff_config:
ff_config.write("keeper_server:\n")
ff_config.write(f"{indentation}feature_flags:\n")
indentation *= 2
for feature_flag in [
"filtered_list",
"multi_read",
"check_not_exists",
"create_if_not_exists",
"remove_recursive",
]:
ff_config.write(
f"{indentation}{feature_flag}: {get_feature_flag_value(feature_flag)}\n"
)
run_and_check(self.base_zookeeper_cmd + common_opts, env=self.env)
self.up_called = True

View File

@ -13,6 +13,7 @@ node = cluster.add_instance(
main_configs=["configs/enable_keeper_map.xml"],
user_configs=["configs/keeper_retries.xml"],
with_zookeeper=True,
keeper_required_feature_flags=["multi_read"],
stay_alive=True,
)

View File

@ -20,6 +20,7 @@ node1 = cluster.add_instance(
main_configs=["configs/config.xml"],
user_configs=["configs/users.xml"],
with_zookeeper=True,
keeper_required_feature_flags=["multi_read", "create_if_not_exists"],
macros={"shard": "shard1", "replica": "1"},
stay_alive=True,
)
@ -28,6 +29,7 @@ node2 = cluster.add_instance(
main_configs=["configs/config.xml"],
user_configs=["configs/users.xml"],
with_zookeeper=True,
keeper_required_feature_flags=["multi_read", "create_if_not_exists"],
macros={"shard": "shard1", "replica": "2"},
)
nodes = [node1, node2]

View File

@ -59,6 +59,9 @@ instance = cluster.add_instance(
user_configs=["configs/users.xml"],
with_kafka=True,
with_zookeeper=True, # For Replicated Table
keeper_required_feature_flags=[
"create_if_not_exists"
], # new Kafka doesn't work without this feature
macros={
"kafka_broker": "kafka1",
"kafka_topic_old": KAFKA_TOPIC_OLD,

View File

@ -99,6 +99,7 @@ def started_cluster():
with_minio=True,
with_azurite=True,
with_zookeeper=True,
keeper_required_feature_flags=["create_if_not_exists"],
main_configs=[
"configs/zookeeper.xml",
"configs/s3queue_log.xml",
@ -110,6 +111,7 @@ def started_cluster():
user_configs=["configs/users.xml"],
with_minio=True,
with_zookeeper=True,
keeper_required_feature_flags=["create_if_not_exists"],
main_configs=[
"configs/s3queue_log.xml",
],
@ -118,6 +120,7 @@ def started_cluster():
cluster.add_instance(
"old_instance",
with_zookeeper=True,
keeper_required_feature_flags=["create_if_not_exists"],
image="clickhouse/clickhouse-server",
tag="23.12",
stay_alive=True,
@ -127,6 +130,7 @@ def started_cluster():
cluster.add_instance(
"node1",
with_zookeeper=True,
keeper_required_feature_flags=["create_if_not_exists"],
stay_alive=True,
main_configs=[
"configs/zookeeper.xml",
@ -137,6 +141,7 @@ def started_cluster():
cluster.add_instance(
"node2",
with_zookeeper=True,
keeper_required_feature_flags=["create_if_not_exists"],
stay_alive=True,
main_configs=[
"configs/zookeeper.xml",
@ -149,6 +154,7 @@ def started_cluster():
user_configs=["configs/users.xml"],
with_minio=True,
with_zookeeper=True,
keeper_required_feature_flags=["create_if_not_exists"],
main_configs=[
"configs/s3queue_log.xml",
"configs/merge_tree.xml",
@ -158,6 +164,7 @@ def started_cluster():
cluster.add_instance(
"instance_24.5",
with_zookeeper=True,
keeper_required_feature_flags=["create_if_not_exists"],
image="clickhouse/clickhouse-server",
tag="24.5",
stay_alive=True,
@ -170,6 +177,7 @@ def started_cluster():
cluster.add_instance(
"node_cloud_mode",
with_zookeeper=True,
keeper_required_feature_flags=["create_if_not_exists"],
stay_alive=True,
main_configs=[
"configs/zookeeper.xml",

View File

@ -96,13 +96,15 @@ SELECT 'First JOIN INNER second JOIN INNER';
First JOIN INNER second JOIN INNER
SELECT id AS using_id, toTypeName(using_id), t1.id AS t1_id, toTypeName(t1_id), t1.value AS t1_value, toTypeName(t1_value),
t2.id AS t2_id, toTypeName(t2_id), t2.value AS t2_value, toTypeName(t2_value), t3.id AS t3_id, toTypeName(t3_id), t3.value AS t3_value, toTypeName(t3_value)
FROM test_table_join_1 AS t1 INNER JOIN test_table_join_2 AS t2 USING (id) INNER JOIN test_table_join_3 AS t3 USING(id);
FROM test_table_join_1 AS t1 INNER JOIN test_table_join_2 AS t2 USING (id) INNER JOIN test_table_join_3 AS t3 USING(id)
ORDER BY ALL;
0 UInt64 0 UInt64 Join_1_Value_0 String 0 UInt64 Join_2_Value_0 String 0 UInt64 Join_3_Value_0 String
1 UInt64 1 UInt64 Join_1_Value_1 String 1 UInt64 Join_2_Value_1 String 1 UInt64 Join_3_Value_1 String
SELECT '--';
--
SELECT t1.value AS t1_value, toTypeName(t1_value), t2.value AS t2_value, toTypeName(t2_value), t3.value AS t3_value, toTypeName(t3_value)
FROM test_table_join_1 AS t1 INNER JOIN test_table_join_2 AS t2 USING (id) INNER JOIN test_table_join_3 AS t3 USING(id);
FROM test_table_join_1 AS t1 INNER JOIN test_table_join_2 AS t2 USING (id) INNER JOIN test_table_join_3 AS t3 USING(id)
ORDER BY ALL;
Join_1_Value_0 String Join_2_Value_0 String Join_3_Value_0 String
Join_1_Value_1 String Join_2_Value_1 String Join_3_Value_1 String
SELECT '--';
@ -115,13 +117,15 @@ SELECT 'First JOIN INNER second JOIN LEFT';
First JOIN INNER second JOIN LEFT
SELECT id AS using_id, toTypeName(using_id), t1.id AS t1_id, toTypeName(t1_id), t1.value AS t1_value, toTypeName(t1_value),
t2.id AS t2_id, toTypeName(t2_id), t2.value AS t2_value, toTypeName(t2_value), t3.id AS t3_id, toTypeName(t3_id), t3.value AS t3_value, toTypeName(t3_value)
FROM test_table_join_1 AS t1 INNER JOIN test_table_join_2 AS t2 USING (id) LEFT JOIN test_table_join_3 AS t3 USING(id);
FROM test_table_join_1 AS t1 INNER JOIN test_table_join_2 AS t2 USING (id) LEFT JOIN test_table_join_3 AS t3 USING(id)
ORDER BY ALL;
0 UInt64 0 UInt64 Join_1_Value_0 String 0 UInt64 Join_2_Value_0 String 0 UInt64 Join_3_Value_0 String
1 UInt64 1 UInt64 Join_1_Value_1 String 1 UInt64 Join_2_Value_1 String 1 UInt64 Join_3_Value_1 String
SELECT '--';
--
SELECT t1.value AS t1_value, toTypeName(t1_value), t2.value AS t2_value, toTypeName(t2_value), t3.value AS t3_value, toTypeName(t3_value)
FROM test_table_join_1 AS t1 INNER JOIN test_table_join_2 AS t2 USING (id) LEFT JOIN test_table_join_3 AS t3 USING(id);
FROM test_table_join_1 AS t1 INNER JOIN test_table_join_2 AS t2 USING (id) LEFT JOIN test_table_join_3 AS t3 USING(id)
ORDER BY ALL;
Join_1_Value_0 String Join_2_Value_0 String Join_3_Value_0 String
Join_1_Value_1 String Join_2_Value_1 String Join_3_Value_1 String
SELECT '--';
@ -134,17 +138,19 @@ SELECT 'First JOIN INNER second JOIN RIGHT';
First JOIN INNER second JOIN RIGHT
SELECT id AS using_id, toTypeName(using_id), t1.id AS t1_id, toTypeName(t1_id), t1.value AS t1_value, toTypeName(t1_value),
t2.id AS t2_id, toTypeName(t2_id), t2.value AS t2_value, toTypeName(t2_value), t3.id AS t3_id, toTypeName(t3_id), t3.value AS t3_value, toTypeName(t3_value)
FROM test_table_join_1 AS t1 INNER JOIN test_table_join_2 AS t2 USING (id) RIGHT JOIN test_table_join_3 AS t3 USING(id);
FROM test_table_join_1 AS t1 INNER JOIN test_table_join_2 AS t2 USING (id) RIGHT JOIN test_table_join_3 AS t3 USING(id)
ORDER BY ALL;
0 UInt64 0 UInt64 Join_1_Value_0 String 0 UInt64 Join_2_Value_0 String 0 UInt64 Join_3_Value_0 String
1 UInt64 1 UInt64 Join_1_Value_1 String 1 UInt64 Join_2_Value_1 String 1 UInt64 Join_3_Value_1 String
4 UInt64 0 UInt64 String 0 UInt64 String 4 UInt64 Join_3_Value_4 String
SELECT '--';
--
SELECT t1.value AS t1_value, toTypeName(t1_value), t2.value AS t2_value, toTypeName(t2_value), t3.value AS t3_value, toTypeName(t3_value)
FROM test_table_join_1 AS t1 INNER JOIN test_table_join_2 AS t2 USING (id) RIGHT JOIN test_table_join_3 AS t3 USING(id);
FROM test_table_join_1 AS t1 INNER JOIN test_table_join_2 AS t2 USING (id) RIGHT JOIN test_table_join_3 AS t3 USING(id)
ORDER BY ALL;
String String Join_3_Value_4 String
Join_1_Value_0 String Join_2_Value_0 String Join_3_Value_0 String
Join_1_Value_1 String Join_2_Value_1 String Join_3_Value_1 String
String String Join_3_Value_4 String
SELECT '--';
--
SELECT 1 FROM test_table_join_1 AS t1 INNER JOIN test_table_join_2 AS t2 USING (id) RIGHT JOIN test_table_join_3 AS t3 USING(id);
@ -156,17 +162,19 @@ SELECT 'First JOIN INNER second JOIN FULL';
First JOIN INNER second JOIN FULL
SELECT id AS using_id, toTypeName(using_id), t1.id AS t1_id, toTypeName(t1_id), t1.value AS t1_value, toTypeName(t1_value),
t2.id AS t2_id, toTypeName(t2_id), t2.value AS t2_value, toTypeName(t2_value), t3.id AS t3_id, toTypeName(t3_id), t3.value AS t3_value, toTypeName(t3_value)
FROM test_table_join_1 AS t1 INNER JOIN test_table_join_2 AS t2 USING (id) FULL JOIN test_table_join_3 AS t3 USING(id);
FROM test_table_join_1 AS t1 INNER JOIN test_table_join_2 AS t2 USING (id) FULL JOIN test_table_join_3 AS t3 USING(id)
ORDER BY ALL;
0 UInt64 0 UInt64 String 0 UInt64 String 4 UInt64 Join_3_Value_4 String
0 UInt64 0 UInt64 Join_1_Value_0 String 0 UInt64 Join_2_Value_0 String 0 UInt64 Join_3_Value_0 String
1 UInt64 1 UInt64 Join_1_Value_1 String 1 UInt64 Join_2_Value_1 String 1 UInt64 Join_3_Value_1 String
0 UInt64 0 UInt64 String 0 UInt64 String 4 UInt64 Join_3_Value_4 String
SELECT '--';
--
SELECT t1.value AS t1_value, toTypeName(t1_value), t2.value AS t2_value, toTypeName(t2_value), t3.value AS t3_value, toTypeName(t3_value)
FROM test_table_join_1 AS t1 INNER JOIN test_table_join_2 AS t2 USING (id) FULL JOIN test_table_join_3 AS t3 USING(id);
FROM test_table_join_1 AS t1 INNER JOIN test_table_join_2 AS t2 USING (id) FULL JOIN test_table_join_3 AS t3 USING(id)
ORDER BY ALL;
String String Join_3_Value_4 String
Join_1_Value_0 String Join_2_Value_0 String Join_3_Value_0 String
Join_1_Value_1 String Join_2_Value_1 String Join_3_Value_1 String
String String Join_3_Value_4 String
SELECT '--';
--
SELECT 1 FROM test_table_join_1 AS t1 INNER JOIN test_table_join_2 AS t2 USING (id) FULL JOIN test_table_join_3 AS t3 USING(id);
@ -178,13 +186,15 @@ SELECT 'First JOIN LEFT second JOIN INNER';
First JOIN LEFT second JOIN INNER
SELECT id AS using_id, toTypeName(using_id), t1.id AS t1_id, toTypeName(t1_id), t1.value AS t1_value, toTypeName(t1_value),
t2.id AS t2_id, toTypeName(t2_id), t2.value AS t2_value, toTypeName(t2_value), t3.id AS t3_id, toTypeName(t3_id), t3.value AS t3_value, toTypeName(t3_value)
FROM test_table_join_1 AS t1 LEFT JOIN test_table_join_2 AS t2 USING (id) INNER JOIN test_table_join_3 AS t3 USING(id);
FROM test_table_join_1 AS t1 LEFT JOIN test_table_join_2 AS t2 USING (id) INNER JOIN test_table_join_3 AS t3 USING(id)
ORDER BY ALL;
0 UInt64 0 UInt64 Join_1_Value_0 String 0 UInt64 Join_2_Value_0 String 0 UInt64 Join_3_Value_0 String
1 UInt64 1 UInt64 Join_1_Value_1 String 1 UInt64 Join_2_Value_1 String 1 UInt64 Join_3_Value_1 String
SELECT '--';
--
SELECT t1.value AS t1_value, toTypeName(t1_value), t2.value AS t2_value, toTypeName(t2_value), t3.value AS t3_value, toTypeName(t3_value)
FROM test_table_join_1 AS t1 LEFT JOIN test_table_join_2 AS t2 USING (id) INNER JOIN test_table_join_3 AS t3 USING(id);
FROM test_table_join_1 AS t1 LEFT JOIN test_table_join_2 AS t2 USING (id) INNER JOIN test_table_join_3 AS t3 USING(id)
ORDER BY ALL;
Join_1_Value_0 String Join_2_Value_0 String Join_3_Value_0 String
Join_1_Value_1 String Join_2_Value_1 String Join_3_Value_1 String
SELECT '--';
@ -197,14 +207,16 @@ SELECT 'First JOIN LEFT second JOIN LEFT';
First JOIN LEFT second JOIN LEFT
SELECT id AS using_id, toTypeName(using_id), t1.id AS t1_id, toTypeName(t1_id), t1.value AS t1_value, toTypeName(t1_value),
t2.id AS t2_id, toTypeName(t2_id), t2.value AS t2_value, toTypeName(t2_value), t3.id AS t3_id, toTypeName(t3_id), t3.value AS t3_value, toTypeName(t3_value)
FROM test_table_join_1 AS t1 LEFT JOIN test_table_join_2 AS t2 USING (id) LEFT JOIN test_table_join_3 AS t3 USING(id);
FROM test_table_join_1 AS t1 LEFT JOIN test_table_join_2 AS t2 USING (id) LEFT JOIN test_table_join_3 AS t3 USING(id)
ORDER BY ALL;
0 UInt64 0 UInt64 Join_1_Value_0 String 0 UInt64 Join_2_Value_0 String 0 UInt64 Join_3_Value_0 String
1 UInt64 1 UInt64 Join_1_Value_1 String 1 UInt64 Join_2_Value_1 String 1 UInt64 Join_3_Value_1 String
2 UInt64 2 UInt64 Join_1_Value_2 String 0 UInt64 String 0 UInt64 String
SELECT '--';
--
SELECT t1.value AS t1_value, toTypeName(t1_value), t2.value AS t2_value, toTypeName(t2_value), t3.value AS t3_value, toTypeName(t3_value)
FROM test_table_join_1 AS t1 LEFT JOIN test_table_join_2 AS t2 USING (id) LEFT JOIN test_table_join_3 AS t3 USING(id);
FROM test_table_join_1 AS t1 LEFT JOIN test_table_join_2 AS t2 USING (id) LEFT JOIN test_table_join_3 AS t3 USING(id)
ORDER BY ALL;
Join_1_Value_0 String Join_2_Value_0 String Join_3_Value_0 String
Join_1_Value_1 String Join_2_Value_1 String Join_3_Value_1 String
Join_1_Value_2 String String String
@ -219,17 +231,19 @@ SELECT 'First JOIN LEFT second JOIN RIGHT';
First JOIN LEFT second JOIN RIGHT
SELECT id AS using_id, toTypeName(using_id), t1.id AS t1_id, toTypeName(t1_id), t1.value AS t1_value, toTypeName(t1_value),
t2.id AS t2_id, toTypeName(t2_id), t2.value AS t2_value, toTypeName(t2_value), t3.id AS t3_id, toTypeName(t3_id), t3.value AS t3_value, toTypeName(t3_value)
FROM test_table_join_1 AS t1 LEFT JOIN test_table_join_2 AS t2 USING (id) RIGHT JOIN test_table_join_3 AS t3 USING(id);
FROM test_table_join_1 AS t1 LEFT JOIN test_table_join_2 AS t2 USING (id) RIGHT JOIN test_table_join_3 AS t3 USING(id)
ORDER BY ALL;
0 UInt64 0 UInt64 Join_1_Value_0 String 0 UInt64 Join_2_Value_0 String 0 UInt64 Join_3_Value_0 String
1 UInt64 1 UInt64 Join_1_Value_1 String 1 UInt64 Join_2_Value_1 String 1 UInt64 Join_3_Value_1 String
4 UInt64 0 UInt64 String 0 UInt64 String 4 UInt64 Join_3_Value_4 String
SELECT '--';
--
SELECT t1.value AS t1_value, toTypeName(t1_value), t2.value AS t2_value, toTypeName(t2_value), t3.value AS t3_value, toTypeName(t3_value)
FROM test_table_join_1 AS t1 LEFT JOIN test_table_join_2 AS t2 USING (id) RIGHT JOIN test_table_join_3 AS t3 USING(id);
FROM test_table_join_1 AS t1 LEFT JOIN test_table_join_2 AS t2 USING (id) RIGHT JOIN test_table_join_3 AS t3 USING(id)
ORDER BY ALL;
String String Join_3_Value_4 String
Join_1_Value_0 String Join_2_Value_0 String Join_3_Value_0 String
Join_1_Value_1 String Join_2_Value_1 String Join_3_Value_1 String
String String Join_3_Value_4 String
SELECT '--';
--
SELECT 1 FROM test_table_join_1 AS t1 LEFT JOIN test_table_join_2 AS t2 USING (id) RIGHT JOIN test_table_join_3 AS t3 USING(id);
@ -241,19 +255,21 @@ SELECT 'First JOIN LEFT second JOIN FULL';
First JOIN LEFT second JOIN FULL
SELECT id AS using_id, toTypeName(using_id), t1.id AS t1_id, toTypeName(t1_id), t1.value AS t1_value, toTypeName(t1_value),
t2.id AS t2_id, toTypeName(t2_id), t2.value AS t2_value, toTypeName(t2_value), t3.id AS t3_id, toTypeName(t3_id), t3.value AS t3_value, toTypeName(t3_value)
FROM test_table_join_1 AS t1 LEFT JOIN test_table_join_2 AS t2 USING (id) FULL JOIN test_table_join_3 AS t3 USING(id);
FROM test_table_join_1 AS t1 LEFT JOIN test_table_join_2 AS t2 USING (id) FULL JOIN test_table_join_3 AS t3 USING(id)
ORDER BY ALL;
0 UInt64 0 UInt64 String 0 UInt64 String 4 UInt64 Join_3_Value_4 String
0 UInt64 0 UInt64 Join_1_Value_0 String 0 UInt64 Join_2_Value_0 String 0 UInt64 Join_3_Value_0 String
1 UInt64 1 UInt64 Join_1_Value_1 String 1 UInt64 Join_2_Value_1 String 1 UInt64 Join_3_Value_1 String
2 UInt64 2 UInt64 Join_1_Value_2 String 0 UInt64 String 0 UInt64 String
0 UInt64 0 UInt64 String 0 UInt64 String 4 UInt64 Join_3_Value_4 String
SELECT '--';
--
SELECT t1.value AS t1_value, toTypeName(t1_value), t2.value AS t2_value, toTypeName(t2_value), t3.value AS t3_value, toTypeName(t3_value)
FROM test_table_join_1 AS t1 LEFT JOIN test_table_join_2 AS t2 USING (id) FULL JOIN test_table_join_3 AS t3 USING(id);
FROM test_table_join_1 AS t1 LEFT JOIN test_table_join_2 AS t2 USING (id) FULL JOIN test_table_join_3 AS t3 USING(id)
ORDER BY ALL;
String String Join_3_Value_4 String
Join_1_Value_0 String Join_2_Value_0 String Join_3_Value_0 String
Join_1_Value_1 String Join_2_Value_1 String Join_3_Value_1 String
Join_1_Value_2 String String String
String String Join_3_Value_4 String
SELECT '--';
--
SELECT 1 FROM test_table_join_1 AS t1 LEFT JOIN test_table_join_2 AS t2 USING (id) FULL JOIN test_table_join_3 AS t3 USING(id);
@ -266,13 +282,15 @@ SELECT 'First JOIN RIGHT second JOIN INNER';
First JOIN RIGHT second JOIN INNER
SELECT id AS using_id, toTypeName(using_id), t1.id AS t1_id, toTypeName(t1_id), t1.value AS t1_value, toTypeName(t1_value),
t2.id AS t2_id, toTypeName(t2_id), t2.value AS t2_value, toTypeName(t2_value), t3.id AS t3_id, toTypeName(t3_id), t3.value AS t3_value, toTypeName(t3_value)
FROM test_table_join_1 AS t1 RIGHT JOIN test_table_join_2 AS t2 USING (id) INNER JOIN test_table_join_3 AS t3 USING(id);
FROM test_table_join_1 AS t1 RIGHT JOIN test_table_join_2 AS t2 USING (id) INNER JOIN test_table_join_3 AS t3 USING(id)
ORDER BY ALL;
0 UInt64 0 UInt64 Join_1_Value_0 String 0 UInt64 Join_2_Value_0 String 0 UInt64 Join_3_Value_0 String
1 UInt64 1 UInt64 Join_1_Value_1 String 1 UInt64 Join_2_Value_1 String 1 UInt64 Join_3_Value_1 String
SELECT '--';
--
SELECT t1.value AS t1_value, toTypeName(t1_value), t2.value AS t2_value, toTypeName(t2_value), t3.value AS t3_value, toTypeName(t3_value)
FROM test_table_join_1 AS t1 RIGHT JOIN test_table_join_2 AS t2 USING (id) INNER JOIN test_table_join_3 AS t3 USING(id);
FROM test_table_join_1 AS t1 RIGHT JOIN test_table_join_2 AS t2 USING (id) INNER JOIN test_table_join_3 AS t3 USING(id)
ORDER BY ALL;
Join_1_Value_0 String Join_2_Value_0 String Join_3_Value_0 String
Join_1_Value_1 String Join_2_Value_1 String Join_3_Value_1 String
SELECT '--';
@ -285,17 +303,19 @@ SELECT 'First JOIN RIGHT second JOIN LEFT';
First JOIN RIGHT second JOIN LEFT
SELECT id AS using_id, toTypeName(using_id), t1.id AS t1_id, toTypeName(t1_id), t1.value AS t1_value, toTypeName(t1_value),
t2.id AS t2_id, toTypeName(t2_id), t2.value AS t2_value, toTypeName(t2_value), t3.id AS t3_id, toTypeName(t3_id), t3.value AS t3_value, toTypeName(t3_value)
FROM test_table_join_1 AS t1 RIGHT JOIN test_table_join_2 AS t2 USING (id) LEFT JOIN test_table_join_3 AS t3 USING(id);
FROM test_table_join_1 AS t1 RIGHT JOIN test_table_join_2 AS t2 USING (id) LEFT JOIN test_table_join_3 AS t3 USING(id)
ORDER BY ALL;
0 UInt64 0 UInt64 Join_1_Value_0 String 0 UInt64 Join_2_Value_0 String 0 UInt64 Join_3_Value_0 String
1 UInt64 1 UInt64 Join_1_Value_1 String 1 UInt64 Join_2_Value_1 String 1 UInt64 Join_3_Value_1 String
3 UInt64 0 UInt64 String 3 UInt64 Join_2_Value_3 String 0 UInt64 String
SELECT '--';
--
SELECT t1.value AS t1_value, toTypeName(t1_value), t2.value AS t2_value, toTypeName(t2_value), t3.value AS t3_value, toTypeName(t3_value)
FROM test_table_join_1 AS t1 RIGHT JOIN test_table_join_2 AS t2 USING (id) LEFT JOIN test_table_join_3 AS t3 USING(id);
FROM test_table_join_1 AS t1 RIGHT JOIN test_table_join_2 AS t2 USING (id) LEFT JOIN test_table_join_3 AS t3 USING(id)
ORDER BY ALL;
String Join_2_Value_3 String String
Join_1_Value_0 String Join_2_Value_0 String Join_3_Value_0 String
Join_1_Value_1 String Join_2_Value_1 String Join_3_Value_1 String
String Join_2_Value_3 String String
SELECT '--';
--
SELECT 1 FROM test_table_join_1 AS t1 RIGHT JOIN test_table_join_2 AS t2 USING (id) LEFT JOIN test_table_join_3 AS t3 USING(id);
@ -307,17 +327,19 @@ SELECT 'First JOIN RIGHT second JOIN RIGHT';
First JOIN RIGHT second JOIN RIGHT
SELECT id AS using_id, toTypeName(using_id), t1.id AS t1_id, toTypeName(t1_id), t1.value AS t1_value, toTypeName(t1_value),
t2.id AS t2_id, toTypeName(t2_id), t2.value AS t2_value, toTypeName(t2_value), t3.id AS t3_id, toTypeName(t3_id), t3.value AS t3_value, toTypeName(t3_value)
FROM test_table_join_1 AS t1 RIGHT JOIN test_table_join_2 AS t2 USING (id) RIGHT JOIN test_table_join_3 AS t3 USING(id);
FROM test_table_join_1 AS t1 RIGHT JOIN test_table_join_2 AS t2 USING (id) RIGHT JOIN test_table_join_3 AS t3 USING(id)
ORDER BY ALL;
0 UInt64 0 UInt64 Join_1_Value_0 String 0 UInt64 Join_2_Value_0 String 0 UInt64 Join_3_Value_0 String
1 UInt64 1 UInt64 Join_1_Value_1 String 1 UInt64 Join_2_Value_1 String 1 UInt64 Join_3_Value_1 String
4 UInt64 0 UInt64 String 0 UInt64 String 4 UInt64 Join_3_Value_4 String
SELECT '--';
--
SELECT t1.value AS t1_value, toTypeName(t1_value), t2.value AS t2_value, toTypeName(t2_value), t3.value AS t3_value, toTypeName(t3_value)
FROM test_table_join_1 AS t1 RIGHT JOIN test_table_join_2 AS t2 USING (id) RIGHT JOIN test_table_join_3 AS t3 USING(id);
FROM test_table_join_1 AS t1 RIGHT JOIN test_table_join_2 AS t2 USING (id) RIGHT JOIN test_table_join_3 AS t3 USING(id)
ORDER BY ALL;
String String Join_3_Value_4 String
Join_1_Value_0 String Join_2_Value_0 String Join_3_Value_0 String
Join_1_Value_1 String Join_2_Value_1 String Join_3_Value_1 String
String String Join_3_Value_4 String
SELECT '--';
--
SELECT 1 FROM test_table_join_1 AS t1 RIGHT JOIN test_table_join_2 AS t2 USING (id) RIGHT JOIN test_table_join_3 AS t3 USING(id);
@ -329,19 +351,21 @@ SELECT 'First JOIN RIGHT second JOIN FULL';
First JOIN RIGHT second JOIN FULL
SELECT id AS using_id, toTypeName(using_id), t1.id AS t1_id, toTypeName(t1_id), t1.value AS t1_value, toTypeName(t1_value),
t2.id AS t2_id, toTypeName(t2_id), t2.value AS t2_value, toTypeName(t2_value), t3.id AS t3_id, toTypeName(t3_id), t3.value AS t3_value, toTypeName(t3_value)
FROM test_table_join_1 AS t1 RIGHT JOIN test_table_join_2 AS t2 USING (id) FULL JOIN test_table_join_3 AS t3 USING(id);
FROM test_table_join_1 AS t1 RIGHT JOIN test_table_join_2 AS t2 USING (id) FULL JOIN test_table_join_3 AS t3 USING(id)
ORDER BY ALL;
0 UInt64 0 UInt64 String 0 UInt64 String 4 UInt64 Join_3_Value_4 String
0 UInt64 0 UInt64 Join_1_Value_0 String 0 UInt64 Join_2_Value_0 String 0 UInt64 Join_3_Value_0 String
1 UInt64 1 UInt64 Join_1_Value_1 String 1 UInt64 Join_2_Value_1 String 1 UInt64 Join_3_Value_1 String
3 UInt64 0 UInt64 String 3 UInt64 Join_2_Value_3 String 0 UInt64 String
0 UInt64 0 UInt64 String 0 UInt64 String 4 UInt64 Join_3_Value_4 String
SELECT '--';
--
SELECT t1.value AS t1_value, toTypeName(t1_value), t2.value AS t2_value, toTypeName(t2_value), t3.value AS t3_value, toTypeName(t3_value)
FROM test_table_join_1 AS t1 RIGHT JOIN test_table_join_2 AS t2 USING (id) FULL JOIN test_table_join_3 AS t3 USING(id);
FROM test_table_join_1 AS t1 RIGHT JOIN test_table_join_2 AS t2 USING (id) FULL JOIN test_table_join_3 AS t3 USING(id)
ORDER BY ALL;
String String Join_3_Value_4 String
String Join_2_Value_3 String String
Join_1_Value_0 String Join_2_Value_0 String Join_3_Value_0 String
Join_1_Value_1 String Join_2_Value_1 String Join_3_Value_1 String
String Join_2_Value_3 String String
String String Join_3_Value_4 String
SELECT '--';
--
SELECT 1 FROM test_table_join_1 AS t1 RIGHT JOIN test_table_join_2 AS t2 USING (id) FULL JOIN test_table_join_3 AS t3 USING(id);
@ -354,14 +378,16 @@ SELECT 'First JOIN FULL second JOIN INNER';
First JOIN FULL second JOIN INNER
SELECT id AS using_id, toTypeName(using_id), t1.id AS t1_id, toTypeName(t1_id), t1.value AS t1_value, toTypeName(t1_value),
t2.id AS t2_id, toTypeName(t2_id), t2.value AS t2_value, toTypeName(t2_value), t3.id AS t3_id, toTypeName(t3_id), t3.value AS t3_value, toTypeName(t3_value)
FROM test_table_join_1 AS t1 FULL JOIN test_table_join_2 AS t2 USING (id) INNER JOIN test_table_join_3 AS t3 USING(id);
FROM test_table_join_1 AS t1 FULL JOIN test_table_join_2 AS t2 USING (id) INNER JOIN test_table_join_3 AS t3 USING(id)
ORDER BY ALL;
0 UInt64 0 UInt64 String 3 UInt64 Join_2_Value_3 String 0 UInt64 Join_3_Value_0 String
0 UInt64 0 UInt64 Join_1_Value_0 String 0 UInt64 Join_2_Value_0 String 0 UInt64 Join_3_Value_0 String
1 UInt64 1 UInt64 Join_1_Value_1 String 1 UInt64 Join_2_Value_1 String 1 UInt64 Join_3_Value_1 String
0 UInt64 0 UInt64 String 3 UInt64 Join_2_Value_3 String 0 UInt64 Join_3_Value_0 String
SELECT '--';
--
SELECT t1.value AS t1_value, toTypeName(t1_value), t2.value AS t2_value, toTypeName(t2_value), t3.value AS t3_value, toTypeName(t3_value)
FROM test_table_join_1 AS t1 FULL JOIN test_table_join_2 AS t2 USING (id) INNER JOIN test_table_join_3 AS t3 USING(id);
FROM test_table_join_1 AS t1 FULL JOIN test_table_join_2 AS t2 USING (id) INNER JOIN test_table_join_3 AS t3 USING(id)
ORDER BY ALL;
Join_1_Value_0 String Join_2_Value_0 String Join_3_Value_0 String
Join_1_Value_1 String Join_2_Value_1 String Join_3_Value_1 String
SELECT '--';
@ -374,19 +400,21 @@ SELECT 'First JOIN FULL second JOIN LEFT';
First JOIN FULL second JOIN LEFT
SELECT id AS using_id, toTypeName(using_id), t1.id AS t1_id, toTypeName(t1_id), t1.value AS t1_value, toTypeName(t1_value),
t2.id AS t2_id, toTypeName(t2_id), t2.value AS t2_value, toTypeName(t2_value), t3.id AS t3_id, toTypeName(t3_id), t3.value AS t3_value, toTypeName(t3_value)
FROM test_table_join_1 AS t1 FULL JOIN test_table_join_2 AS t2 USING (id) LEFT JOIN test_table_join_3 AS t3 USING(id);
FROM test_table_join_1 AS t1 FULL JOIN test_table_join_2 AS t2 USING (id) LEFT JOIN test_table_join_3 AS t3 USING(id)
ORDER BY ALL;
0 UInt64 0 UInt64 String 3 UInt64 Join_2_Value_3 String 0 UInt64 Join_3_Value_0 String
0 UInt64 0 UInt64 Join_1_Value_0 String 0 UInt64 Join_2_Value_0 String 0 UInt64 Join_3_Value_0 String
1 UInt64 1 UInt64 Join_1_Value_1 String 1 UInt64 Join_2_Value_1 String 1 UInt64 Join_3_Value_1 String
2 UInt64 2 UInt64 Join_1_Value_2 String 0 UInt64 String 0 UInt64 String
0 UInt64 0 UInt64 String 3 UInt64 Join_2_Value_3 String 0 UInt64 Join_3_Value_0 String
SELECT '--';
--
SELECT t1.value AS t1_value, toTypeName(t1_value), t2.value AS t2_value, toTypeName(t2_value), t3.value AS t3_value, toTypeName(t3_value)
FROM test_table_join_1 AS t1 FULL JOIN test_table_join_2 AS t2 USING (id) LEFT JOIN test_table_join_3 AS t3 USING(id);
FROM test_table_join_1 AS t1 FULL JOIN test_table_join_2 AS t2 USING (id) LEFT JOIN test_table_join_3 AS t3 USING(id)
ORDER BY ALL;
String Join_2_Value_3 String String
Join_1_Value_0 String Join_2_Value_0 String Join_3_Value_0 String
Join_1_Value_1 String Join_2_Value_1 String Join_3_Value_1 String
Join_1_Value_2 String String String
String Join_2_Value_3 String String
SELECT '--';
--
SELECT 1 FROM test_table_join_1 AS t1 FULL JOIN test_table_join_2 AS t2 USING (id) LEFT JOIN test_table_join_3 AS t3 USING(id);
@ -399,18 +427,20 @@ SELECT 'First JOIN FULL second JOIN RIGHT';
First JOIN FULL second JOIN RIGHT
SELECT id AS using_id, toTypeName(using_id), t1.id AS t1_id, toTypeName(t1_id), t1.value AS t1_value, toTypeName(t1_value),
t2.id AS t2_id, toTypeName(t2_id), t2.value AS t2_value, toTypeName(t2_value), t3.id AS t3_id, toTypeName(t3_id), t3.value AS t3_value, toTypeName(t3_value)
FROM test_table_join_1 AS t1 FULL JOIN test_table_join_2 AS t2 USING (id) RIGHT JOIN test_table_join_3 AS t3 USING(id);
FROM test_table_join_1 AS t1 FULL JOIN test_table_join_2 AS t2 USING (id) RIGHT JOIN test_table_join_3 AS t3 USING(id)
ORDER BY ALL;
0 UInt64 0 UInt64 String 3 UInt64 Join_2_Value_3 String 0 UInt64 Join_3_Value_0 String
0 UInt64 0 UInt64 Join_1_Value_0 String 0 UInt64 Join_2_Value_0 String 0 UInt64 Join_3_Value_0 String
1 UInt64 1 UInt64 Join_1_Value_1 String 1 UInt64 Join_2_Value_1 String 1 UInt64 Join_3_Value_1 String
0 UInt64 0 UInt64 String 3 UInt64 Join_2_Value_3 String 0 UInt64 Join_3_Value_0 String
4 UInt64 0 UInt64 String 0 UInt64 String 4 UInt64 Join_3_Value_4 String
SELECT '--';
--
SELECT t1.value AS t1_value, toTypeName(t1_value), t2.value AS t2_value, toTypeName(t2_value), t3.value AS t3_value, toTypeName(t3_value)
FROM test_table_join_1 AS t1 FULL JOIN test_table_join_2 AS t2 USING (id) RIGHT JOIN test_table_join_3 AS t3 USING(id);
FROM test_table_join_1 AS t1 FULL JOIN test_table_join_2 AS t2 USING (id) RIGHT JOIN test_table_join_3 AS t3 USING(id)
ORDER BY ALL;
String String Join_3_Value_4 String
Join_1_Value_0 String Join_2_Value_0 String Join_3_Value_0 String
Join_1_Value_1 String Join_2_Value_1 String Join_3_Value_1 String
String String Join_3_Value_4 String
SELECT '--';
--
SELECT 1 FROM test_table_join_1 AS t1 FULL JOIN test_table_join_2 AS t2 USING (id) RIGHT JOIN test_table_join_3 AS t3 USING(id);
@ -422,21 +452,23 @@ SELECT 'First JOIN FULL second JOIN FULL';
First JOIN FULL second JOIN FULL
SELECT id AS using_id, toTypeName(using_id), t1.id AS t1_id, toTypeName(t1_id), t1.value AS t1_value, toTypeName(t1_value),
t2.id AS t2_id, toTypeName(t2_id), t2.value AS t2_value, toTypeName(t2_value), t3.id AS t3_id, toTypeName(t3_id), t3.value AS t3_value, toTypeName(t3_value)
FROM test_table_join_1 AS t1 FULL JOIN test_table_join_2 AS t2 USING (id) FULL JOIN test_table_join_3 AS t3 USING(id);
FROM test_table_join_1 AS t1 FULL JOIN test_table_join_2 AS t2 USING (id) FULL JOIN test_table_join_3 AS t3 USING(id)
ORDER BY ALL;
0 UInt64 0 UInt64 String 0 UInt64 String 4 UInt64 Join_3_Value_4 String
0 UInt64 0 UInt64 String 3 UInt64 Join_2_Value_3 String 0 UInt64 Join_3_Value_0 String
0 UInt64 0 UInt64 Join_1_Value_0 String 0 UInt64 Join_2_Value_0 String 0 UInt64 Join_3_Value_0 String
1 UInt64 1 UInt64 Join_1_Value_1 String 1 UInt64 Join_2_Value_1 String 1 UInt64 Join_3_Value_1 String
2 UInt64 2 UInt64 Join_1_Value_2 String 0 UInt64 String 0 UInt64 String
0 UInt64 0 UInt64 String 3 UInt64 Join_2_Value_3 String 0 UInt64 Join_3_Value_0 String
0 UInt64 0 UInt64 String 0 UInt64 String 4 UInt64 Join_3_Value_4 String
SELECT '--';
--
SELECT t1.value AS t1_value, toTypeName(t1_value), t2.value AS t2_value, toTypeName(t2_value), t3.value AS t3_value, toTypeName(t3_value)
FROM test_table_join_1 AS t1 FULL JOIN test_table_join_2 AS t2 USING (id) FULL JOIN test_table_join_3 AS t3 USING(id);
FROM test_table_join_1 AS t1 FULL JOIN test_table_join_2 AS t2 USING (id) FULL JOIN test_table_join_3 AS t3 USING(id)
ORDER BY ALL;
String String Join_3_Value_4 String
String Join_2_Value_3 String String
Join_1_Value_0 String Join_2_Value_0 String Join_3_Value_0 String
Join_1_Value_1 String Join_2_Value_1 String Join_3_Value_1 String
Join_1_Value_2 String String String
String Join_2_Value_3 String String
String String Join_3_Value_4 String
SELECT '--';
--
SELECT 1 FROM test_table_join_1 AS t1 FULL JOIN test_table_join_2 AS t2 USING (id) FULL JOIN test_table_join_3 AS t3 USING(id);

View File

@ -64,12 +64,14 @@ SELECT 'First JOIN {{ first_join_type }} second JOIN {{ second_join_type }}';
SELECT id AS using_id, toTypeName(using_id), t1.id AS t1_id, toTypeName(t1_id), t1.value AS t1_value, toTypeName(t1_value),
t2.id AS t2_id, toTypeName(t2_id), t2.value AS t2_value, toTypeName(t2_value), t3.id AS t3_id, toTypeName(t3_id), t3.value AS t3_value, toTypeName(t3_value)
FROM test_table_join_1 AS t1 {{ first_join_type }} JOIN test_table_join_2 AS t2 USING (id) {{ second_join_type }} JOIN test_table_join_3 AS t3 USING(id);
FROM test_table_join_1 AS t1 {{ first_join_type }} JOIN test_table_join_2 AS t2 USING (id) {{ second_join_type }} JOIN test_table_join_3 AS t3 USING(id)
ORDER BY ALL;
SELECT '--';
SELECT t1.value AS t1_value, toTypeName(t1_value), t2.value AS t2_value, toTypeName(t2_value), t3.value AS t3_value, toTypeName(t3_value)
FROM test_table_join_1 AS t1 {{ first_join_type }} JOIN test_table_join_2 AS t2 USING (id) {{ second_join_type }} JOIN test_table_join_3 AS t3 USING(id);
FROM test_table_join_1 AS t1 {{ first_join_type }} JOIN test_table_join_2 AS t2 USING (id) {{ second_join_type }} JOIN test_table_join_3 AS t3 USING(id)
ORDER BY ALL;
SELECT '--';