initial commit for Hive-style partitioning

This commit is contained in:
yariks5s 2024-07-02 14:01:19 +00:00
parent 73ab582c26
commit 04b8b1e76c
29 changed files with 749 additions and 10 deletions

View File

@ -1106,6 +1106,11 @@ class IColumn;
M(Bool, input_format_tsv_skip_trailing_empty_lines, false, "Skip trailing empty lines in TSV format", 0) \
M(Bool, input_format_custom_skip_trailing_empty_lines, false, "Skip trailing empty lines in CustomSeparated format", 0) \
M(Bool, input_format_tsv_crlf_end_of_line, false, "If it is set true, file function will read TSV format with \\r\\n instead of \\n.", 0) \
M(Bool, file_hive_partitioning, false, "Allows to use hive partitioning for file format", 0)\
M(Bool, url_hive_partitioning, false, "Allows to use hive partitioning for url format", 0)\
M(Bool, s3_hive_partitioning, false, "Allows to use hive partitioning for s3 format", 0)\
M(Bool, azure_blob_storage_hive_partitioning, false, "Allows to use hive partitioning for AzureBlobStorage format", 0)\
M(Bool, hdfs_hive_partitioning, false, "Allows to use hive partitioning for hdfs format", 0)\
\
M(Bool, input_format_native_allow_types_conversion, true, "Allow data types conversion in Native input format", 0) \
\

View File

@ -118,6 +118,11 @@ static const std::map<ClickHouseVersion, SettingsChangesHistory::SettingsChanges
{"input_format_csv_deserialize_separate_columns_into_tuple", true, true, "A new way of how interpret tuples in CSV format was added."},
{"input_format_csv_try_infer_strings_from_quoted_tuples", true, true, "A new way of how interpret tuples in CSV format was added."},
{"input_format_json_ignore_key_case", false, false, "Ignore json key case while read json field from string."},
{"file_hive_partitioning", false, false, "A new settings that allows to use hive partitioning for file format."},
{"url_hive_partitioning", false, false, "A new settings that allows to use hive partitioning for url format."},
{"s3_hive_partitioning", false, false, "A new settings that allows to use hive partitioning for s3 format."},
{"azure_blob_storage_hive_partitioning", false, false, "A new settings that allows to use hive partitioning for AzureBlobStorage format."},
{"hdfs_hive_partitioning", false, false, "A new settings that allows to use hive partitioning for hdfs format."},
}},
{"24.5", {{"allow_deprecated_error_prone_window_functions", true, false, "Allow usage of deprecated error prone window functions (neighbor, runningAccumulate, runningDifferenceStartingWithFirstValue, runningDifference)"},
{"allow_experimental_join_condition", false, false, "Support join with inequal conditions which involve columns from both left and right table. e.g. t1.y < t2.y."},

View File

@ -1,4 +1,5 @@
#include <Storages/ObjectStorage/StorageObjectStorage.h>
#include <Core/ColumnWithTypeAndName.h>
#include <Formats/FormatFactory.h>
#include <Parsers/ASTInsertQuery.h>
@ -32,6 +33,19 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
bool checkIfHiveSettingEnabled(const ContextPtr & context, const std::string & storage_type_name)
{
if (storage_type_name == "s3")
return context->getSettings().s3_hive_partitioning;
else if (storage_type_name == "hdfs")
return context->getSettings().hdfs_hive_partitioning;
else if (storage_type_name == "azure")
return context->getSettings().azure_blob_storage_hive_partitioning;
else
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Unsupported storage type: {}", storage_type_name);
}
StorageObjectStorage::StorageObjectStorage(
ConfigurationPtr configuration_,
ObjectStoragePtr object_storage_,
@ -60,7 +74,23 @@ StorageObjectStorage::StorageObjectStorage(
metadata.setConstraints(constraints_);
metadata.setComment(comment);
setVirtuals(VirtualColumnUtils::getVirtualsForFileLikeStorage(metadata.getColumns()));
auto file_iterator = StorageObjectStorageSource::createFileIterator(
configuration,
object_storage,
distributed_processing_,
context,
{}, // predicate
metadata.getColumns().getAll(), // virtual_columns
nullptr, // read_keys
{} // file_progress_callback
);
Strings paths;
if (checkIfHiveSettingEnabled(context, configuration->getTypeName()))
if (auto file = file_iterator->next(0))
paths = {file->getPath()};
setVirtuals(VirtualColumnUtils::getVirtualsForFileLikeStorage(metadata.getColumns(), paths));
setInMemoryMetadata(metadata);
}

View File

@ -13,6 +13,7 @@
#include <Storages/ObjectStorage/StorageObjectStorage.h>
#include <Storages/Cache/SchemaCache.h>
#include <Common/parseGlobs.h>
#include <DataTypes/DataTypeString.h>
namespace fs = std::filesystem;
@ -195,13 +196,24 @@ Chunk StorageObjectStorageSource::generate()
const auto & object_info = reader.getObjectInfo();
const auto & filename = object_info->getFileName();
chassert(object_info->metadata);
auto hive_map = VirtualColumnUtils::parsePartitionMapFromPath(object_info->getPath());
bool contains_virtual_column = std::any_of(hive_map.begin(), hive_map.end(),
[&](const auto& pair) {
return read_from_format_info.requested_virtual_columns.contains(pair.first);
});
if (!contains_virtual_column)
hive_map.clear(); // If we cannot find any virual column in requested, we don't add any of them to chunk
VirtualColumnUtils::addRequestedFileLikeStorageVirtualsToChunk(
chunk, read_from_format_info.requested_virtual_columns,
{
.path = getUniqueStoragePathIdentifier(*configuration, *object_info, false),
.size = object_info->metadata->size_bytes,
.filename = &filename,
.last_modified = object_info->metadata->last_modified
.last_modified = object_info->metadata->last_modified,
.hive_partitioning_map = hive_map
});
return chunk;
}

View File

@ -52,6 +52,7 @@
#include <Common/logger_useful.h>
#include <Common/ProfileEvents.h>
#include <Common/re2.h>
#include <Formats/SchemaInferenceUtils.h>
#include <QueryPipeline/Pipe.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
@ -1095,7 +1096,11 @@ void StorageFile::setStorageMetadata(CommonArguments args)
storage_metadata.setConstraints(args.constraints);
storage_metadata.setComment(args.comment);
setInMemoryMetadata(storage_metadata);
setVirtuals(VirtualColumnUtils::getVirtualsForFileLikeStorage(storage_metadata.getColumns()));
Strings paths_for_virtuals;
if (args.getContext()->getSettingsRef().file_hive_partitioning)
paths_for_virtuals = paths;
setVirtuals(VirtualColumnUtils::getVirtualsForFileLikeStorage(storage_metadata.getColumns(), paths_for_virtuals));
}
@ -1437,6 +1442,15 @@ Chunk StorageFileSource::generate()
chunk_size = input_format->getApproxBytesReadForChunk();
progress(num_rows, chunk_size ? chunk_size : chunk.bytes());
std::map<std::string, std::string> hive_map;
if (getContext()->getSettingsRef().file_hive_partitioning)
{
hive_map = VirtualColumnUtils::parsePartitionMapFromPath(current_path);
for (const auto& item : hive_map)
requested_virtual_columns.push_back(NameAndTypePair(item.first, std::make_shared<DataTypeString>()));
}
/// Enrich with virtual columns.
VirtualColumnUtils::addRequestedFileLikeStorageVirtualsToChunk(
chunk, requested_virtual_columns,
@ -1444,7 +1458,8 @@ Chunk StorageFileSource::generate()
.path = current_path,
.size = current_file_size,
.filename = (filename_override.has_value() ? &filename_override.value() : nullptr),
.last_modified = current_file_last_modified
.last_modified = current_file_last_modified,
.hive_partitioning_map = hive_map
});
return chunk;
@ -1621,6 +1636,16 @@ void ReadFromFile::createIterator(const ActionsDAG::Node * predicate)
storage->distributed_processing);
}
void addPartitionColumnsToInfoHeader(Strings paths, ReadFromFormatInfo & info)
{
for (const auto& path : paths)
{
auto map = VirtualColumnUtils::parsePartitionMapFromPath(path);
for (const auto& item : map)
info.source_header.insertUnique(ColumnWithTypeAndName(std::make_shared<DataTypeString>(), item.first));
}
}
void ReadFromFile::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{
createIterator(nullptr);
@ -1628,10 +1653,20 @@ void ReadFromFile::initializePipeline(QueryPipelineBuilder & pipeline, const Bui
size_t num_streams = max_num_streams;
size_t files_to_read = 0;
Strings paths;
if (storage->archive_info)
{
files_to_read = storage->archive_info->paths_to_archives.size();
paths = storage->archive_info->paths_to_archives;
}
else
{
files_to_read = storage->paths.size();
paths = storage->paths;
}
if (getContext()->getSettingsRef().file_hive_partitioning)
addPartitionColumnsToInfoHeader(paths, info);
if (max_num_streams > files_to_read)
num_streams = files_to_read;

View File

@ -36,6 +36,7 @@
#include <Common/thread_local_rng.h>
#include <Common/logger_useful.h>
#include <Common/re2.h>
#include <Formats/SchemaInferenceUtils.h>
#include <IO/ReadWriteBufferFromHTTP.h>
#include <IO/HTTPHeaderEntries.h>
@ -151,7 +152,11 @@ IStorageURLBase::IStorageURLBase(
storage_metadata.setConstraints(constraints_);
storage_metadata.setComment(comment);
setInMemoryMetadata(storage_metadata);
setVirtuals(VirtualColumnUtils::getVirtualsForFileLikeStorage(storage_metadata.getColumns()));
Strings uri_for_partitioning;
if (context_->getSettingsRef().url_hive_partitioning)
uri_for_partitioning = {uri};
setVirtuals(VirtualColumnUtils::getVirtualsForFileLikeStorage(storage_metadata.getColumns(), uri_for_partitioning));
}
@ -410,12 +415,17 @@ Chunk StorageURLSource::generate()
size_t chunk_size = 0;
if (input_format)
chunk_size = input_format->getApproxBytesReadForChunk();
std::map<std::string, std::string> hive_map;
if (getContext()->getSettingsRef().url_hive_partitioning)
hive_map = VirtualColumnUtils::parsePartitionMapFromPath(curr_uri.getPath());
progress(num_rows, chunk_size ? chunk_size : chunk.bytes());
VirtualColumnUtils::addRequestedFileLikeStorageVirtualsToChunk(
chunk, requested_virtual_columns,
{
.path = curr_uri.getPath(),
.size = current_file_size
.size = current_file_size,
.hive_partitioning_map = hive_map
});
return chunk;
}
@ -1170,6 +1180,7 @@ void ReadFromURL::createIterator(const ActionsDAG::Node * predicate)
void ReadFromURL::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{
createIterator(nullptr);
const auto & settings = context->getSettingsRef();
if (is_empty_glob)
{
@ -1180,7 +1191,6 @@ void ReadFromURL::initializePipeline(QueryPipelineBuilder & pipeline, const Buil
Pipes pipes;
pipes.reserve(num_streams);
const auto & settings = context->getSettingsRef();
const size_t max_parsing_threads = num_streams >= settings.max_parsing_threads ? 1 : (settings.max_parsing_threads / num_streams);
for (size_t i = 0; i < num_streams; ++i)

View File

@ -1,4 +1,3 @@
#include <algorithm>
#include <memory>
#include <stack>
#include <Core/NamesAndTypes.h>
@ -37,6 +36,7 @@
#include <Storages/VirtualColumnUtils.h>
#include <IO/WriteHelpers.h>
#include <Common/re2.h>
#include <Common/typeid_cast.h>
#include "Functions/FunctionsLogical.h"
#include "Functions/IFunction.h"
@ -115,7 +115,22 @@ NameSet getVirtualNamesForFileLikeStorage()
return {"_path", "_file", "_size", "_time"};
}
VirtualColumnsDescription getVirtualsForFileLikeStorage(const ColumnsDescription & storage_columns)
Strings parseVirtualColumnNameFromPath(const std::string & path)
{
std::string pattern = "/([^/]+)=([^/]+)";
// Map to store the key-value pairs
std::map<std::string, std::string> key_values;
re2::StringPiece input_piece(path);
std::string key;
Strings result;
while (RE2::FindAndConsume(&input_piece, pattern, &key))
result.push_back(key);
return result;
}
VirtualColumnsDescription getVirtualsForFileLikeStorage(const ColumnsDescription & storage_columns, Strings paths)
{
VirtualColumnsDescription desc;
@ -132,6 +147,13 @@ VirtualColumnsDescription getVirtualsForFileLikeStorage(const ColumnsDescription
add_virtual("_size", makeNullable(std::make_shared<DataTypeUInt64>()));
add_virtual("_time", makeNullable(std::make_shared<DataTypeDateTime>()));
for (const auto& path : paths)
{
auto names = parseVirtualColumnNameFromPath(path);
for (const auto& name : names)
add_virtual("_" + name, std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>()));
}
return desc;
}
@ -178,6 +200,8 @@ ColumnPtr getFilterByPathAndFileIndexes(const std::vector<String> & paths, const
{
if (column.name == "_file" || column.name == "_path")
block.insert({column.type->createColumn(), column.type, column.name});
if (!getVirtualNamesForFileLikeStorage().contains(column.name))
block.insert({column.type->createColumn(), column.type, column.name});
}
block.insert({ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "_idx"});
@ -189,6 +213,21 @@ ColumnPtr getFilterByPathAndFileIndexes(const std::vector<String> & paths, const
return block.getByName("_idx").column;
}
std::map<std::string, std::string> parsePartitionMapFromPath(const std::string & path)
{
std::string pattern = "/([^/]+)=([^/]+)"; // Regex to capture key=value pairs
// Map to store the key-value pairs
std::map<std::string, std::string> key_values;
re2::StringPiece input_piece(path);
std::string key;
std::string value;
while (RE2::FindAndConsume(&input_piece, pattern, &key, &value))
key_values["_" + key] = value;
return key_values;
}
void addRequestedFileLikeStorageVirtualsToChunk(
Chunk & chunk, const NamesAndTypesList & requested_virtual_columns,
VirtualsForFileLikeStorage virtual_values)
@ -226,6 +265,15 @@ void addRequestedFileLikeStorageVirtualsToChunk(
else
chunk.addColumn(virtual_column.type->createColumnConstWithDefaultValue(chunk.getNumRows())->convertToFullColumnIfConst());
}
else
{
auto it = virtual_values.hive_partitioning_map.find(virtual_column.getNameInStorage());
if (it != virtual_values.hive_partitioning_map.end())
{
chunk.addColumn(virtual_column.getTypeInStorage()->createColumnConst(chunk.getNumRows(), it->second)->convertToFullColumnIfConst());
virtual_values.hive_partitioning_map.erase(it);
}
}
}
}

View File

@ -6,6 +6,8 @@
#include <Storages/SelectQueryInfo.h>
#include <Storages/VirtualColumnsDescription.h>
#include <map>
#include <string>
#include <unordered_set>
@ -47,7 +49,7 @@ auto extractSingleValueFromBlock(const Block & block, const String & name)
}
NameSet getVirtualNamesForFileLikeStorage();
VirtualColumnsDescription getVirtualsForFileLikeStorage(const ColumnsDescription & storage_columns);
VirtualColumnsDescription getVirtualsForFileLikeStorage(const ColumnsDescription & storage_columns, Strings paths = {});
ActionsDAGPtr createPathAndFileFilterDAG(const ActionsDAG::Node * predicate, const NamesAndTypesList & virtual_columns);
@ -74,9 +76,12 @@ struct VirtualsForFileLikeStorage
std::optional<size_t> size { std::nullopt };
const String * filename { nullptr };
std::optional<Poco::Timestamp> last_modified { std::nullopt };
std::map<std::string, std::string> hive_partitioning_map;
};
std::map<std::string, std::string> parsePartitionMapFromPath(const std::string & path);
void addRequestedFileLikeStorageVirtualsToChunk(
Chunk & chunk, const NamesAndTypesList & requested_virtual_columns,
VirtualsForFileLikeStorage virtual_values);

View File

@ -0,0 +1,39 @@
<clickhouse>
<remote_servers>
<simple_cluster>
<shard>
<replica>
<host>node_0</host>
<port>9000</port>
</replica>
<replica>
<host>node_1</host>
<port>9000</port>
</replica>
<replica>
<host>node_2</host>
<port>9000</port>
</replica>
</shard>
</simple_cluster>
<cluster_non_existent_port>
<shard>
<replica>
<host>node_0</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>node_1</host>
<port>19000</port>
</replica>
</shard>
</cluster_non_existent_port>
</remote_servers>
<macros>
<default_cluster_macro>simple_cluster</default_cluster_macro>
</macros>
</clickhouse>

View File

@ -0,0 +1,33 @@
<clickhouse>
<remote_servers>
<cluster_non_existent_port>
<shard>
<replica>
<host>node1</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>node1</host>
<port>19000</port>
</replica>
</shard>
</cluster_non_existent_port>
<test_cluster_two_shards>
<shard>
<replica>
<host>127.0.0.1</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>127.0.0.2</host>
<port>9000</port>
</replica>
</shard>
</test_cluster_two_shards>
</remote_servers>
</clickhouse>

View File

@ -0,0 +1,9 @@
<!-- Sometime azurite is super slow, profiler make it even worse -->
<clickhouse>
<profiles>
<default>
<query_profiler_real_time_period_ns>0</query_profiler_real_time_period_ns>
<query_profiler_cpu_time_period_ns>0</query_profiler_cpu_time_period_ns>
</default>
</profiles>
</clickhouse>

View File

@ -0,0 +1,5 @@
<clickhouse>
<macros>
<default_cluster_macro>test_cluster_two_shards</default_cluster_macro>
</macros>
</clickhouse>

View File

@ -0,0 +1,14 @@
<clickhouse>
<named_collections>
<azure_conf1>
<container>cont</container>
<blob_path>test_simple_write_named.csv</blob_path>
<structure>key UInt64, data String</structure>
<format>CSV</format>
</azure_conf1>
<azure_conf2>
<account_name>devstoreaccount1</account_name>
<account_key>Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==</account_key>
</azure_conf2>
</named_collections>
</clickhouse>

View File

@ -0,0 +1,3 @@
<clickhouse>
<schema_inference_cache_max_elements_for_azure>2</schema_inference_cache_max_elements_for_azure>
</clickhouse>

View File

@ -0,0 +1,3 @@
<clickhouse>
<schema_inference_cache_max_elements_for_hdfs>2</schema_inference_cache_max_elements_for_hdfs>
</clickhouse>

View File

@ -0,0 +1,9 @@
<clickhouse>
<users>
<default>
<password></password>
<profile>default</profile>
<named_collection_control>1</named_collection_control>
</default>
</users>
</clickhouse>

View File

@ -0,0 +1,204 @@
#!/usr/bin/env python3
import pytest
import time
from helpers.cluster import ClickHouseCluster, is_arm
import re
from azure.storage.blob import BlobServiceClient
from helpers.cluster import ClickHouseCluster, ClickHouseInstance
if is_arm():
pytestmark = pytest.mark.skip
@pytest.fixture(scope="module")
def cluster():
try:
cluster = ClickHouseCluster(__file__)
cluster.add_instance(
"node",
main_configs=["configs/named_collections_azure.xml", "configs/schema_cache_azure.xml"],
user_configs=["configs/disable_profilers_azure.xml", "configs/users_azure.xml"],
with_azurite=True,
)
cluster.start()
container_client = cluster.blob_service_client.get_container_client("cont")
container_client.create_container()
yield cluster
finally:
cluster.shutdown()
def azure_query(
node, query, expect_error=False, try_num=10, settings={}, query_on_retry=None
):
for i in range(try_num):
try:
if expect_error:
return node.query_and_get_error(query, settings=settings)
else:
return node.query(query, settings=settings)
except Exception as ex:
retriable_errors = [
"DB::Exception: Azure::Core::Http::TransportException: Connection was closed by the server while trying to read a response",
"DB::Exception: Azure::Core::Http::TransportException: Connection closed before getting full response or response is less than expected",
"DB::Exception: Azure::Core::Http::TransportException: Connection was closed by the server while trying to read a response",
"DB::Exception: Azure::Core::Http::TransportException: Error while polling for socket ready read",
"Azure::Core::Http::TransportException, e.what() = Connection was closed by the server while trying to read a response",
"Azure::Core::Http::TransportException, e.what() = Connection closed before getting full response or response is less than expected",
"Azure::Core::Http::TransportException, e.what() = Connection was closed by the server while trying to read a response",
"Azure::Core::Http::TransportException, e.what() = Error while polling for socket ready read",
]
retry = False
for error in retriable_errors:
if error in str(ex):
retry = True
print(f"Try num: {i}. Having retriable error: {ex}")
time.sleep(i)
break
if not retry or i == try_num - 1:
raise Exception(ex)
if query_on_retry is not None:
node.query(query_on_retry)
continue
def get_azure_file_content(filename, port):
container_name = "cont"
connection_string = (
f"DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;"
f"AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;"
f"BlobEndpoint=http://127.0.0.1:{port}/devstoreaccount1;"
)
blob_service_client = BlobServiceClient.from_connection_string(
str(connection_string)
)
container_client = blob_service_client.get_container_client(container_name)
blob_client = container_client.get_blob_client(filename)
download_stream = blob_client.download_blob()
return download_stream.readall().decode("utf-8")
@pytest.fixture(autouse=True, scope="function")
def delete_all_files(cluster):
port = cluster.env_variables["AZURITE_PORT"]
connection_string = (
f"DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;"
f"AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;"
f"BlobEndpoint=http://127.0.0.1:{port}/devstoreaccount1;"
)
blob_service_client = BlobServiceClient.from_connection_string(connection_string)
containers = blob_service_client.list_containers()
for container in containers:
container_client = blob_service_client.get_container_client(container)
blob_list = container_client.list_blobs()
for blob in blob_list:
print(blob)
blob_client = container_client.get_blob_client(blob)
blob_client.delete_blob()
assert len(list(container_client.list_blobs())) == 0
yield
def test_azure_partitioning_with_one_parameter(cluster):
# type: (ClickHouseCluster) -> None
node = cluster.instances["node"] # type: ClickHouseInstance
table_format = "column1 String, column2 String"
values = f"('Elizabeth', 'Gordon')"
path = "a/column1=Elizabeth/sample.csv"
azure_query(
node,
f"INSERT INTO TABLE FUNCTION azureBlobStorage(azure_conf2, storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}',"
f" container='cont', blob_path='{path}', format='CSV', compression='auto', structure='{table_format}') VALUES {values}",
)
query = (
f"SELECT column1, column2, _file, _path, _column1 FROM azureBlobStorage(azure_conf2, "
f"storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', container='cont', "
f"blob_path='{path}', format='CSV', structure='{table_format}')"
)
assert azure_query(node, query, settings={"azure_blob_storage_hive_partitioning": 1}).splitlines() == [
"Elizabeth\tGordon\tsample.csv\t{bucket}/{max_path}\tElizabeth".format(
bucket="cont", max_path=path
)
]
query = (
f"SELECT column2 FROM azureBlobStorage(azure_conf2, "
f"storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', container='cont', "
f"blob_path='{path}', format='CSV', structure='{table_format}') WHERE column1=_column1;"
)
assert azure_query(node, query, settings={"azure_blob_storage_hive_partitioning": 1}).splitlines() == [
"Gordon"
]
def test_azure_partitioning_with_two_parameters(cluster):
# type: (ClickHouseCluster) -> None
node = cluster.instances["node"] # type: ClickHouseInstance
table_format = "column1 String, column2 String"
values_1 = f"('Elizabeth', 'Gordon')"
values_2 = f"('Emilia', 'Gregor')"
path = "a/column1=Elizabeth/column2=Gordon/sample.csv"
azure_query(
node,
f"INSERT INTO TABLE FUNCTION azureBlobStorage(azure_conf2, storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}',"
f" container='cont', blob_path='{path}', format='CSV', compression='auto', structure='{table_format}') VALUES {values_1}, {values_2}",
)
query = (
f"SELECT column1, column2, _file, _path, _column1, _column2 FROM azureBlobStorage(azure_conf2, "
f"storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', container='cont', "
f"blob_path='{path}', format='CSV', structure='{table_format}') WHERE column1=_column1;"
)
assert azure_query(node, query, settings={"azure_blob_storage_hive_partitioning": 1}).splitlines() == [
"Elizabeth\tGordon\tsample.csv\t{bucket}/{max_path}\tElizabeth\tGordon".format(
bucket="cont", max_path=path
)
]
query = (
f"SELECT column1 FROM azureBlobStorage(azure_conf2, "
f"storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', container='cont', "
f"blob_path='{path}', format='CSV', structure='{table_format}') WHERE column2=_column2;"
)
assert azure_query(node, query, settings={"azure_blob_storage_hive_partitioning": 1}).splitlines() == [
"Elizabeth"
]
query = (
f"SELECT column1 FROM azureBlobStorage(azure_conf2, "
f"storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', container='cont', "
f"blob_path='{path}', format='CSV', structure='{table_format}') WHERE column2=_column2 AND column1=_column1;"
)
assert azure_query(node, query, settings={"azure_blob_storage_hive_partitioning": 1}).splitlines() == [
"Elizabeth"
]
def test_azure_partitioning_without_setting(cluster):
# type: (ClickHouseCluster) -> None
node = cluster.instances["node"] # type: ClickHouseInstance
table_format = "column1 String, column2 String"
values_1 = f"('Elizabeth', 'Gordon')"
values_2 = f"('Emilia', 'Gregor')"
path = "a/column1=Elizabeth/column2=Gordon/sample.csv"
azure_query(
node,
f"INSERT INTO TABLE FUNCTION azureBlobStorage(azure_conf2, storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}',"
f" container='cont', blob_path='{path}', format='CSV', compression='auto', structure='{table_format}') VALUES {values_1}, {values_2}",
)
query = (
f"SELECT column1, column2, _file, _path, _column1, _column2 FROM azureBlobStorage(azure_conf2, "
f"storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', container='cont', "
f"blob_path='{path}', format='CSV', structure='{table_format}') WHERE column1=_column1;"
)
pattern = re.compile(r"DB::Exception: Unknown expression identifier '.*' in scope.*", re.DOTALL)
with pytest.raises(Exception, match=pattern):
azure_query(node, query, settings={"azure_blob_storage_hive_partitioning": 0})

View File

@ -0,0 +1,81 @@
#!/usr/bin/env python3
import pytest
from helpers.client import QueryRuntimeException
from helpers.cluster import ClickHouseCluster, is_arm
import re
from helpers.cluster import ClickHouseCluster
if is_arm():
pytestmark = pytest.mark.skip
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance(
"node1",
main_configs=[
"configs/macro_hdfs.xml",
"configs/schema_cache_hdfs.xml",
"configs/cluster_hdfs.xml",
],
with_hdfs=True,
)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_hdfs_partitioning_with_one_parameter(started_cluster):
hdfs_api = started_cluster.hdfs_api
hdfs_api.write_data(
f"/column0=Elizabeth/parquet_1", f"Elizabeth\tGordon\n"
)
assert (
hdfs_api.read_data(f"/column0=Elizabeth/parquet_1")
== f"Elizabeth\tGordon\n"
)
r = node1.query(
"SELECT _column0 FROM hdfs('hdfs://hdfs1:9000/column0=Elizabeth/parquet_1', 'TSV')", settings={"hdfs_hive_partitioning": 1}
)
assert (r == f"Elizabeth\n")
def test_hdfs_partitioning_with_two_parameters(started_cluster):
hdfs_api = started_cluster.hdfs_api
hdfs_api.write_data(
f"/column0=Elizabeth/column1=Gordon/parquet_2", f"Elizabeth\tGordon\n"
)
assert (
hdfs_api.read_data(f"/column0=Elizabeth/column1=Gordon/parquet_2")
== f"Elizabeth\tGordon\n"
)
r = node1.query(
"SELECT _column1 FROM hdfs('hdfs://hdfs1:9000/column0=Elizabeth/column1=Gordon/parquet_2', 'TSV');", settings={"hdfs_hive_partitioning": 1}
)
assert (r == f"Gordon\n")
def test_hdfs_partitioning_without_setting(started_cluster):
hdfs_api = started_cluster.hdfs_api
hdfs_api.write_data(
f"/column0=Elizabeth/column1=Gordon/parquet_2", f"Elizabeth\tGordon\n"
)
assert (
hdfs_api.read_data(f"/column0=Elizabeth/column1=Gordon/parquet_2")
== f"Elizabeth\tGordon\n"
)
pattern = re.compile(r"DB::Exception: Unknown expression identifier '.*' in scope.*", re.DOTALL)
with pytest.raises(QueryRuntimeException, match=pattern):
node1.query(f"SELECT _column1 FROM hdfs('hdfs://hdfs1:9000/column0=Elizabeth/column1=Gordon/parquet_2', 'TSV');", settings={"hdfs_hive_partitioning": 0})
if __name__ == "__main__":
cluster.start()
input("Cluster created, press any key to destroy...")
cluster.shutdown()

View File

@ -0,0 +1,96 @@
TESTING THE FILE HIVE PARTITIONING
first last Elizabeth
Jorge Frank Elizabeth
Hunter Moreno Elizabeth
Esther Guzman Elizabeth
Dennis Stephens Elizabeth
Nettie Franklin Elizabeth
Stanley Gibson Elizabeth
Eugenia Greer Elizabeth
Jeffery Delgado Elizabeth
Clara Cross Elizabeth
Elizabeth Gordon Elizabeth
Eva Schmidt Elizabeth Schmidt
Samuel Schmidt Elizabeth Schmidt
Eva Schmidt Elizabeth
Samuel Schmidt Elizabeth
Elizabeth Gordon Elizabeth Gordon
Elizabeth Gordon Elizabeth
Elizabeth Gordon Elizabeth Gordon
Elizabeth Gordon Elizabeth
first last Elizabeth
Jorge Frank Elizabeth
Hunter Moreno Elizabeth
Esther Guzman Elizabeth
Dennis Stephens Elizabeth
Nettie Franklin Elizabeth
Stanley Gibson Elizabeth
Eugenia Greer Elizabeth
Jeffery Delgado Elizabeth
Clara Cross Elizabeth
Elizabeth Gordon Elizabeth
1
TESTING THE URL PARTITIONING
first last Elizabeth
Jorge Frank Elizabeth
Hunter Moreno Elizabeth
Esther Guzman Elizabeth
Dennis Stephens Elizabeth
Nettie Franklin Elizabeth
Stanley Gibson Elizabeth
Eugenia Greer Elizabeth
Jeffery Delgado Elizabeth
Clara Cross Elizabeth
Elizabeth Gordon Elizabeth
Eva Schmidt Elizabeth Schmidt
Samuel Schmidt Elizabeth Schmidt
Eva Schmidt Elizabeth
Samuel Schmidt Elizabeth
Elizabeth Gordon Elizabeth Gordon
Elizabeth Gordon Elizabeth
Elizabeth Gordon Elizabeth Gordon
Elizabeth Gordon Elizabeth
first last Elizabeth
Jorge Frank Elizabeth
Hunter Moreno Elizabeth
Esther Guzman Elizabeth
Dennis Stephens Elizabeth
Nettie Franklin Elizabeth
Stanley Gibson Elizabeth
Eugenia Greer Elizabeth
Jeffery Delgado Elizabeth
Clara Cross Elizabeth
Elizabeth Gordon Elizabeth
1
TESTING THE S3 PARTITIONING
first last Elizabeth
Jorge Frank Elizabeth
Hunter Moreno Elizabeth
Esther Guzman Elizabeth
Dennis Stephens Elizabeth
Nettie Franklin Elizabeth
Stanley Gibson Elizabeth
Eugenia Greer Elizabeth
Jeffery Delgado Elizabeth
Clara Cross Elizabeth
Elizabeth Gordon Elizabeth
Eva Schmidt Elizabeth Schmidt
Samuel Schmidt Elizabeth Schmidt
Eva Schmidt Elizabeth
Samuel Schmidt Elizabeth
Elizabeth Gordon Elizabeth Gordon
Elizabeth Gordon Elizabeth
Elizabeth Gordon Elizabeth Gordon
Elizabeth Gordon Elizabeth
first last Elizabeth
Jorge Frank Elizabeth
Hunter Moreno Elizabeth
Esther Guzman Elizabeth
Dennis Stephens Elizabeth
Nettie Franklin Elizabeth
Stanley Gibson Elizabeth
Eugenia Greer Elizabeth
Jeffery Delgado Elizabeth
Clara Cross Elizabeth
Elizabeth Gordon Elizabeth
1

View File

@ -0,0 +1,93 @@
#!/usr/bin/env bash
# Tags: no-fasttest
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
$CLICKHOUSE_LOCAL -q "SELECT 'TESTING THE FILE HIVE PARTITIONING'"
$CLICKHOUSE_LOCAL -n -q """set file_hive_partitioning = 1;
SELECT *, _column0 FROM file('$CURDIR/data_hive/partitioning/column0=Elizabeth/sample.parquet') LIMIT 10;
SELECT *, _column0 FROM file('$CURDIR/data_hive/partitioning/column0=Elizabeth/sample.parquet') WHERE column0 = _column0;
SELECT *, _column0, _column1 FROM file('$CURDIR/data_hive/partitioning/column0=Elizabeth/column1=Schmidt/sample.parquet') WHERE column1 = _column1;
SELECT *, _column0 FROM file('$CURDIR/data_hive/partitioning/column0=Elizabeth/column1=Schmidt/sample.parquet') WHERE column1 = _column1;
SELECT *, _column0, _column1 FROM file('$CURDIR/data_hive/partitioning/column0=Elizabeth/column1=Schmidt/sample.parquet') WHERE column0 = _column0 AND column1 = _column1;
SELECT *, _column0 FROM file('$CURDIR/data_hive/partitioning/column0=Elizabeth/column1=Schmidt/sample.parquet') WHERE column0 = _column0 AND column1 = _column1;
SELECT *, _column0, _column1 FROM file('$CURDIR/data_hive/partitioning/column0=Elizabeth/column1=Gordon/sample.parquet') WHERE column1 = _column1;
SELECT *, _column0 FROM file('$CURDIR/data_hive/partitioning/column0=Elizabeth/column1=Gordon/sample.parquet') WHERE column1 = _column1;
SELECT *, _column0, _column1 FROM file('$CURDIR/data_hive/partitioning/column0=Elizabeth/column1=Gordon/sample.parquet') WHERE column0 = _column0 AND column1 = _column1;
SELECT *, _column0 FROM file('$CURDIR/data_hive/partitioning/column0=Elizabeth/column1=Gordon/sample.parquet') WHERE column0 = _column0 AND column1 = _column1;
SELECT *, _non_existing_column FROM file('$CURDIR/data_hive/partitioning/non_existing_column=Elizabeth/sample.parquet') LIMIT 10;
SELECT *, _column0 FROM file('$CURDIR/data_hive/partitioning/column0=*/sample.parquet') WHERE column0 = _column0;"""
$CLICKHOUSE_LOCAL -n -q """set file_hive_partitioning = 0;
SELECT *, _column0 FROM file('$CURDIR/data_hive/partitioning/column0=Elizabeth/sample.parquet') LIMIT 10;""" 2>&1 | grep -c "UNKNOWN_IDENTIFIER"
$CLICKHOUSE_LOCAL -q "SELECT 'TESTING THE URL PARTITIONING'"
$CLICKHOUSE_LOCAL -n -q """set url_hive_partitioning = 1;
SELECT *, _column0 FROM url('http://localhost:11111/test/partitioning/column0=Elizabeth/sample.parquet') LIMIT 10;
SELECT *, _column0 FROM url('http://localhost:11111/test/partitioning/column0=Elizabeth/sample.parquet') WHERE column0 = _column0;
SELECT *, _column0, _column1 FROM url('http://localhost:11111/test/partitioning/column0=Elizabeth/column1=Schmidt/sample.parquet') WHERE column1 = _column1;
SELECT *, _column0 FROM url('http://localhost:11111/test/partitioning/column0=Elizabeth/column1=Schmidt/sample.parquet') WHERE column1 = _column1;
SELECT *, _column0, _column1 FROM url('http://localhost:11111/test/partitioning/column0=Elizabeth/column1=Schmidt/sample.parquet') WHERE column0 = _column0 AND column1 = _column1;
SELECT *, _column0 FROM url('http://localhost:11111/test/partitioning/column0=Elizabeth/column1=Schmidt/sample.parquet') WHERE column0 = _column0 AND column1 = _column1;
SELECT *, _column0, _column1 FROM url('http://localhost:11111/test/partitioning/column0=Elizabeth/column1=Gordon/sample.parquet') WHERE column1 = _column1;
SELECT *, _column0 FROM url('http://localhost:11111/test/partitioning/column0=Elizabeth/column1=Gordon/sample.parquet') WHERE column1 = _column1;
SELECT *, _column0, _column1 FROM url('http://localhost:11111/test/partitioning/column0=Elizabeth/column1=Gordon/sample.parquet') WHERE column0 = _column0 AND column1 = _column1;
SELECT *, _column0 FROM url('http://localhost:11111/test/partitioning/column0=Elizabeth/column1=Gordon/sample.parquet') WHERE column0 = _column0 AND column1 = _column1;
SELECT *, _non_existing_column FROM url('http://localhost:11111/test/partitioning/non_existing_column=Elizabeth/sample.parquet') LIMIT 10;
SELECT *, _column0 FROM url('http://localhost:11111/test/partitioning/column0=*/sample.parquet') WHERE column0 = _column0;"""
$CLICKHOUSE_LOCAL -n -q """set url_hive_partitioning = 0;
SELECT *, _column0 FROM url('http://localhost:11111/test/partitioning/column0=Elizabeth/sample.parquet') LIMIT 10;""" 2>&1 | grep -c "UNKNOWN_IDENTIFIER"
$CLICKHOUSE_LOCAL -q "SELECT 'TESTING THE S3 PARTITIONING'"
$CLICKHOUSE_LOCAL -n -q """set s3_hive_partitioning = 1;
SELECT *, _column0 FROM s3('http://localhost:11111/test/partitioning/column0=Elizabeth/sample.parquet') LIMIT 10;
SELECT *, _column0 FROM s3('http://localhost:11111/test/partitioning/column0=Elizabeth/sample.parquet') WHERE column0 = _column0;
SELECT *, _column0, _column1 FROM s3('http://localhost:11111/test/partitioning/column0=Elizabeth/column1=Schmidt/sample.parquet') WHERE column1 = _column1;
SELECT *, _column0 FROM s3('http://localhost:11111/test/partitioning/column0=Elizabeth/column1=Schmidt/sample.parquet') WHERE column1 = _column1;
SELECT *, _column0, _column1 FROM s3('http://localhost:11111/test/partitioning/column0=Elizabeth/column1=Schmidt/sample.parquet') WHERE column0 = _column0 AND column1 = _column1;
SELECT *, _column0 FROM s3('http://localhost:11111/test/partitioning/column0=Elizabeth/column1=Schmidt/sample.parquet') WHERE column0 = _column0 AND column1 = _column1;
SELECT *, _column0, _column1 FROM s3('http://localhost:11111/test/partitioning/column0=Elizabeth/column1=Gordon/sample.parquet') WHERE column1 = _column1;
SELECT *, _column0 FROM s3('http://localhost:11111/test/partitioning/column0=Elizabeth/column1=Gordon/sample.parquet') WHERE column1 = _column1;
SELECT *, _column0, _column1 FROM s3('http://localhost:11111/test/partitioning/column0=Elizabeth/column1=Gordon/sample.parquet') WHERE column0 = _column0 AND column1 = _column1;
SELECT *, _column0 FROM s3('http://localhost:11111/test/partitioning/column0=Elizabeth/column1=Gordon/sample.parquet') WHERE column0 = _column0 AND column1 = _column1;
SELECT *, _non_existing_column FROM s3('http://localhost:11111/test/partitioning/non_existing_column=Elizabeth/sample.parquet') LIMIT 10;
SELECT *, _column0 FROM s3('http://localhost:11111/test/partitioning/column0=*/sample.parquet') WHERE column0 = _column0;"""
$CLICKHOUSE_LOCAL -n -q """set s3_hive_partitioning = 0;
SELECT *, _column0 FROM s3('http://localhost:11111/test/partitioning/column0=Elizabeth/sample.parquet') LIMIT 10;""" 2>&1 | grep -c "UNKNOWN_IDENTIFIER"