Merge pull request #55796 from ClickHouse/Fix_azurite_port

Fix azurite port issue
This commit is contained in:
Alexey Milovidov 2023-10-28 00:40:24 +02:00 committed by GitHub
commit 243499aad1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 461 additions and 300 deletions

View File

@ -4,10 +4,10 @@ services:
azurite1:
image: mcr.microsoft.com/azure-storage/azurite
ports:
- "10000:10000"
- "${AZURITE_PORT}:${AZURITE_PORT}"
volumes:
- data1-1:/data1
command: azurite-blob --blobHost 0.0.0.0 --blobPort 10000 --debug /azurite_log
command: azurite-blob --blobHost 0.0.0.0 --blobPort ${AZURITE_PORT} --debug /azurite_log
volumes:
data1-1:

View File

@ -515,6 +515,7 @@ class ClickHouseCluster:
self.spark_session = None
self.with_azurite = False
self._azurite_port = 0
# available when with_hdfs == True
self.hdfs_host = "hdfs1"
@ -734,6 +735,13 @@ class ClickHouseCluster:
self._kerberized_kafka_port = get_free_port()
return self._kerberized_kafka_port
@property
def azurite_port(self):
if self._azurite_port:
return self._azurite_port
self._azurite_port = get_free_port()
return self._azurite_port
@property
def mongo_port(self):
if self._mongo_port:
@ -1436,6 +1444,16 @@ class ClickHouseCluster:
def setup_azurite_cmd(self, instance, env_variables, docker_compose_yml_dir):
self.with_azurite = True
env_variables["AZURITE_PORT"] = str(self.azurite_port)
env_variables[
"AZURITE_STORAGE_ACCOUNT_URL"
] = f"http://azurite1:{env_variables['AZURITE_PORT']}/devstoreaccount1"
env_variables["AZURITE_CONNECTION_STRING"] = (
f"DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;"
f"AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;"
f"BlobEndpoint={env_variables['AZURITE_STORAGE_ACCOUNT_URL']};"
)
self.base_cmd.extend(
["--file", p.join(docker_compose_yml_dir, "docker_compose_azurite.yml")]
)
@ -2524,7 +2542,11 @@ class ClickHouseCluster:
def wait_azurite_to_start(self, timeout=180):
from azure.storage.blob import BlobServiceClient
connection_string = "DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1;"
connection_string = (
f"DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;"
f"AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;"
f"BlobEndpoint=http://127.0.0.1:{self.env_variables['AZURITE_PORT']}/devstoreaccount1;"
)
time.sleep(1)
start = time.time()
while time.time() - start < timeout:

View File

@ -0,0 +1,27 @@
<clickhouse>
<remote_servers>
<test_cluster>
<shard>
<replica>
<host>node1</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>node2</host>
<port>9000</port>
</replica>
</shard>
</test_cluster>
</remote_servers>
<macros>
<cluster>test_cluster</cluster>
</macros>
<merge_tree>
<allow_remote_fs_zero_copy_replication>true</allow_remote_fs_zero_copy_replication>
<ratio_of_defaults_for_sparse_serialization>1.0</ratio_of_defaults_for_sparse_serialization>
</merge_tree>
</clickhouse>

View File

@ -1,50 +0,0 @@
<clickhouse>
<storage_configuration>
<disks>
<blob_storage_disk>
<type>azure_blob_storage</type>
<storage_account_url>http://azurite1:10000/devstoreaccount1</storage_account_url>
<container_name>cont</container_name>
<skip_access_check>false</skip_access_check>
<!-- default credentials for Azurite storage account -->
<account_name>devstoreaccount1</account_name>
<account_key>Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==</account_key>
</blob_storage_disk>
</disks>
<policies>
<blob_storage_policy>
<volumes>
<main>
<disk>blob_storage_disk</disk>
</main>
</volumes>
</blob_storage_policy>
</policies>
</storage_configuration>
<remote_servers>
<test_cluster>
<shard>
<replica>
<host>node1</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>node2</host>
<port>9000</port>
</replica>
</shard>
</test_cluster>
</remote_servers>
<macros>
<cluster>test_cluster</cluster>
</macros>
<merge_tree>
<allow_remote_fs_zero_copy_replication>true</allow_remote_fs_zero_copy_replication>
<ratio_of_defaults_for_sparse_serialization>1.0</ratio_of_defaults_for_sparse_serialization>
</merge_tree>
</clickhouse>

View File

@ -1,6 +1,8 @@
import logging
import pytest
from helpers.cluster import ClickHouseCluster
from test_storage_azure_blob_storage.test import azure_query
import os
logging.getLogger().setLevel(logging.INFO)
@ -15,20 +17,65 @@ CLUSTER_NAME = "test_cluster"
drop_table_statement = f"DROP TABLE {TABLE_NAME} ON CLUSTER {CLUSTER_NAME} SYNC"
def generate_cluster_def(port):
path = os.path.join(
os.path.dirname(os.path.realpath(__file__)),
"./_gen/storage_conf.xml",
)
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, "w") as f:
f.write(
f"""<clickhouse>
<storage_configuration>
<disks>
<blob_storage_disk>
<type>azure_blob_storage</type>
<storage_account_url>http://azurite1:{port}/devstoreaccount1</storage_account_url>
<container_name>cont</container_name>
<skip_access_check>false</skip_access_check>
<!-- default credentials for Azurite storage account -->
<account_name>devstoreaccount1</account_name>
<account_key>Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==</account_key>
</blob_storage_disk>
</disks>
<policies>
<blob_storage_policy>
<volumes>
<main>
<disk>blob_storage_disk</disk>
</main>
</volumes>
</blob_storage_policy>
</policies>
</storage_configuration>
</clickhouse>
"""
)
return path
@pytest.fixture(scope="module")
def cluster():
try:
cluster = ClickHouseCluster(__file__)
port = cluster.azurite_port
path = generate_cluster_def(port)
cluster.add_instance(
NODE1,
main_configs=["configs/config.d/storage_conf.xml"],
main_configs=[
"configs/config.d/config.xml",
path,
],
macros={"replica": "1"},
with_azurite=True,
with_zookeeper=True,
)
cluster.add_instance(
NODE2,
main_configs=["configs/config.d/storage_conf.xml"],
main_configs=[
"configs/config.d/config.xml",
path,
],
macros={"replica": "2"},
with_azurite=True,
with_zookeeper=True,
@ -57,7 +104,7 @@ def create_table(node, table_name, replica, **additional_settings):
ORDER BY id
SETTINGS {",".join((k+"="+repr(v) for k, v in settings.items()))}"""
node.query(create_table_statement)
azure_query(node, create_table_statement)
assert node.query(f"SELECT COUNT(*) FROM {table_name} FORMAT Values") == "(0)"
@ -80,27 +127,29 @@ def test_zero_copy_replication(cluster):
values1 = "(0,'data'),(1,'data')"
values2 = "(2,'data'),(3,'data')"
node1.query(f"INSERT INTO {TABLE_NAME} VALUES {values1}")
azure_query(node1, f"INSERT INTO {TABLE_NAME} VALUES {values1}")
node2.query(f"SYSTEM SYNC REPLICA {TABLE_NAME}")
assert (
node1.query(f"SELECT * FROM {TABLE_NAME} order by id FORMAT Values") == values1
azure_query(node1, f"SELECT * FROM {TABLE_NAME} order by id FORMAT Values")
== values1
)
assert (
node2.query(f"SELECT * FROM {TABLE_NAME} order by id FORMAT Values") == values1
azure_query(node2, f"SELECT * FROM {TABLE_NAME} order by id FORMAT Values")
== values1
)
# Based on version 21.x - should be only one file with size 100+ (checksums.txt), used by both nodes
assert get_large_objects_count(blob_container_client) == 1
node2.query(f"INSERT INTO {TABLE_NAME} VALUES {values2}")
azure_query(node2, f"INSERT INTO {TABLE_NAME} VALUES {values2}")
node1.query(f"SYSTEM SYNC REPLICA {TABLE_NAME}")
assert (
node2.query(f"SELECT * FROM {TABLE_NAME} order by id FORMAT Values")
azure_query(node2, f"SELECT * FROM {TABLE_NAME} order by id FORMAT Values")
== values1 + "," + values2
)
assert (
node1.query(f"SELECT * FROM {TABLE_NAME} order by id FORMAT Values")
azure_query(node1, f"SELECT * FROM {TABLE_NAME} order by id FORMAT Values")
== values1 + "," + values2
)

View File

@ -1,38 +0,0 @@
<clickhouse>
<storage_configuration>
<disks>
<blob_storage_disk>
<type>azure_blob_storage</type>
<storage_account_url>http://azurite1:10000/devstoreaccount1</storage_account_url>
<container_name>cont</container_name>
<skip_access_check>false</skip_access_check>
<!-- default credentials for Azurite storage account -->
<account_name>devstoreaccount1</account_name>
<account_key>Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==</account_key>
<max_single_part_upload_size>100000</max_single_part_upload_size>
<max_single_download_retries>10</max_single_download_retries>
<max_single_read_retries>10</max_single_read_retries>
<!-- NOTE: container_already_exists is omitted to:
a) create it
b) ignore if it already exists, since there are two instances, that conflicts with each other
-->
</blob_storage_disk>
<hdd>
<type>local</type>
<path>/</path>
</hdd>
</disks>
<policies>
<blob_storage_policy>
<volumes>
<main>
<disk>blob_storage_disk</disk>
</main>
<external>
<disk>hdd</disk>
</external>
</volumes>
</blob_storage_policy>
</policies>
</storage_configuration>
</clickhouse>

View File

@ -18,15 +18,63 @@ LOCAL_DISK = "hdd"
CONTAINER_NAME = "cont"
def generate_cluster_def(port):
path = os.path.join(
os.path.dirname(os.path.realpath(__file__)),
"./_gen/disk_storage_conf.xml",
)
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, "w") as f:
f.write(
f"""<clickhouse>
<storage_configuration>
<disks>
<blob_storage_disk>
<type>azure_blob_storage</type>
<storage_account_url>http://azurite1:{port}/devstoreaccount1</storage_account_url>
<container_name>cont</container_name>
<skip_access_check>false</skip_access_check>
<account_name>devstoreaccount1</account_name>
<account_key>Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==</account_key>
<max_single_part_upload_size>100000</max_single_part_upload_size>
<max_single_download_retries>10</max_single_download_retries>
<max_single_read_retries>10</max_single_read_retries>
</blob_storage_disk>
<hdd>
<type>local</type>
<path>/</path>
</hdd>
</disks>
<policies>
<blob_storage_policy>
<volumes>
<main>
<disk>blob_storage_disk</disk>
</main>
<external>
<disk>hdd</disk>
</external>
</volumes>
</blob_storage_policy>
</policies>
</storage_configuration>
</clickhouse>
"""
)
return path
@pytest.fixture(scope="module")
def cluster():
try:
cluster = ClickHouseCluster(__file__)
port = cluster.azurite_port
path = generate_cluster_def(port)
cluster.add_instance(
NODE_NAME,
main_configs=[
"configs/config.d/storage_conf.xml",
"configs/config.d/bg_processing_pool_conf.xml",
path,
],
with_azurite=True,
)
@ -490,9 +538,7 @@ def test_apply_new_settings(cluster):
create_table(node, TABLE_NAME)
config_path = os.path.join(
SCRIPT_DIR,
"./{}/node/configs/config.d/storage_conf.xml".format(
cluster.instances_dir_name
),
"./_gen/disk_storage_conf.xml".format(cluster.instances_dir_name),
)
azure_query(

View File

@ -1,14 +1,12 @@
<clickhouse>
<named_collections>
<azure_conf1>
<connection_string>DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://azurite1:10000/devstoreaccount1;</connection_string>
<container>cont</container>
<blob_path>test_simple_write_named.csv</blob_path>
<structure>key UInt64, data String</structure>
<format>CSV</format>
</azure_conf1>
<azure_conf2>
<storage_account_url>http://azurite1:10000/devstoreaccount1</storage_account_url>
<account_name>devstoreaccount1</account_name>
<account_key>Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==</account_key>
</azure_conf2>

View File

@ -29,7 +29,6 @@ def cluster():
with_azurite=True,
)
cluster.start()
yield cluster
finally:
cluster.shutdown()
@ -69,19 +68,29 @@ def azure_query(
continue
def get_azure_file_content(filename):
def get_azure_file_content(filename, port):
container_name = "cont"
connection_string = "DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1;"
blob_service_client = BlobServiceClient.from_connection_string(connection_string)
connection_string = (
f"DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;"
f"AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;"
f"BlobEndpoint=http://127.0.0.1:{port}/devstoreaccount1;"
)
blob_service_client = BlobServiceClient.from_connection_string(
str(connection_string)
)
container_client = blob_service_client.get_container_client(container_name)
blob_client = container_client.get_blob_client(filename)
download_stream = blob_client.download_blob()
return download_stream.readall().decode("utf-8")
def put_azure_file_content(filename, data):
def put_azure_file_content(filename, port, data):
container_name = "cont"
connection_string = "DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1;"
connection_string = (
f"DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;"
f"AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;"
f"BlobEndpoint=http://127.0.0.1:{port}/devstoreaccount1;"
)
blob_service_client = BlobServiceClient.from_connection_string(connection_string)
try:
container_client = blob_service_client.create_container(container_name)
@ -94,8 +103,13 @@ def put_azure_file_content(filename, data):
@pytest.fixture(autouse=True, scope="function")
def delete_all_files():
connection_string = "DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1;"
def delete_all_files(cluster):
port = cluster.env_variables["AZURITE_PORT"]
connection_string = (
f"DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;"
f"AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;"
f"BlobEndpoint=http://127.0.0.1:{port}/devstoreaccount1;"
)
blob_service_client = BlobServiceClient.from_connection_string(connection_string)
containers = blob_service_client.list_containers()
for container in containers:
@ -115,7 +129,8 @@ def test_create_table_connection_string(cluster):
node = cluster.instances["node"]
azure_query(
node,
"CREATE TABLE test_create_table_conn_string (key UInt64, data String) Engine = AzureBlobStorage('DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://azurite1:10000/devstoreaccount1/;', 'cont', 'test_create_connection_string', 'CSV')",
f"CREATE TABLE test_create_table_conn_string (key UInt64, data String) Engine = AzureBlobStorage('{cluster.env_variables['AZURITE_CONNECTION_STRING']}',"
f"'cont', 'test_create_connection_string', 'CSV')",
)
@ -123,57 +138,67 @@ def test_create_table_account_string(cluster):
node = cluster.instances["node"]
azure_query(
node,
"CREATE TABLE test_create_table_account_url (key UInt64, data String) Engine = AzureBlobStorage('http://azurite1:10000/devstoreaccount1', 'cont', 'test_create_connection_string', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV')",
f"CREATE TABLE test_create_table_account_url (key UInt64, data String) Engine = AzureBlobStorage('{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}',"
f"'cont', 'test_create_connection_string', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV')",
)
def test_simple_write_account_string(cluster):
node = cluster.instances["node"]
port = cluster.env_variables["AZURITE_PORT"]
azure_query(
node,
"CREATE TABLE test_simple_write (key UInt64, data String) Engine = AzureBlobStorage('http://azurite1:10000/devstoreaccount1', 'cont', 'test_simple_write.csv', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV')",
f"CREATE TABLE test_simple_write (key UInt64, data String) Engine = AzureBlobStorage('{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}',"
f" 'cont', 'test_simple_write.csv', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV')",
)
azure_query(node, "INSERT INTO test_simple_write VALUES (1, 'a')")
print(get_azure_file_content("test_simple_write.csv"))
assert get_azure_file_content("test_simple_write.csv") == '1,"a"\n'
print(get_azure_file_content("test_simple_write.csv", port))
assert get_azure_file_content("test_simple_write.csv", port) == '1,"a"\n'
def test_simple_write_connection_string(cluster):
node = cluster.instances["node"]
port = cluster.env_variables["AZURITE_PORT"]
azure_query(
node,
"CREATE TABLE test_simple_write_connection_string (key UInt64, data String) Engine = AzureBlobStorage('DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://azurite1:10000/devstoreaccount1;', 'cont', 'test_simple_write_c.csv', 'CSV')",
f"CREATE TABLE test_simple_write_connection_string (key UInt64, data String) Engine = AzureBlobStorage('{cluster.env_variables['AZURITE_CONNECTION_STRING']}', "
f"'cont', 'test_simple_write_c.csv', 'CSV')",
)
azure_query(node, "INSERT INTO test_simple_write_connection_string VALUES (1, 'a')")
print(get_azure_file_content("test_simple_write_c.csv"))
assert get_azure_file_content("test_simple_write_c.csv") == '1,"a"\n'
print(get_azure_file_content("test_simple_write_c.csv", port))
assert get_azure_file_content("test_simple_write_c.csv", port) == '1,"a"\n'
def test_simple_write_named_collection_1(cluster):
node = cluster.instances["node"]
port = cluster.env_variables["AZURITE_PORT"]
azure_query(
node,
"CREATE TABLE test_simple_write_named_collection_1 (key UInt64, data String) Engine = AzureBlobStorage(azure_conf1)",
f"CREATE TABLE test_simple_write_named_collection_1 (key UInt64, data String) Engine = AzureBlobStorage(azure_conf1, "
f"connection_string = '{cluster.env_variables['AZURITE_CONNECTION_STRING']}')",
)
azure_query(
node, "INSERT INTO test_simple_write_named_collection_1 VALUES (1, 'a')"
)
print(get_azure_file_content("test_simple_write_named.csv"))
assert get_azure_file_content("test_simple_write_named.csv") == '1,"a"\n'
print(get_azure_file_content("test_simple_write_named.csv", port))
assert get_azure_file_content("test_simple_write_named.csv", port) == '1,"a"\n'
azure_query(node, "TRUNCATE TABLE test_simple_write_named_collection_1")
def test_simple_write_named_collection_2(cluster):
node = cluster.instances["node"]
port = cluster.env_variables["AZURITE_PORT"]
azure_query(
node,
"CREATE TABLE test_simple_write_named_collection_2 (key UInt64, data String) Engine = AzureBlobStorage(azure_conf2, container='cont', blob_path='test_simple_write_named_2.csv', format='CSV')",
f"CREATE TABLE test_simple_write_named_collection_2 (key UInt64, data String) Engine = AzureBlobStorage(azure_conf2, "
f"storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', "
f"container='cont', blob_path='test_simple_write_named_2.csv', format='CSV')",
)
azure_query(
node, "INSERT INTO test_simple_write_named_collection_2 VALUES (1, 'a')"
)
print(get_azure_file_content("test_simple_write_named_2.csv"))
assert get_azure_file_content("test_simple_write_named_2.csv") == '1,"a"\n'
print(get_azure_file_content("test_simple_write_named_2.csv", port))
assert get_azure_file_content("test_simple_write_named_2.csv", port) == '1,"a"\n'
def test_partition_by(cluster):
@ -182,16 +207,19 @@ def test_partition_by(cluster):
partition_by = "column3"
values = "(1, 2, 3), (3, 2, 1), (78, 43, 45)"
filename = "test_{_partition_id}.csv"
port = cluster.env_variables["AZURITE_PORT"]
azure_query(
node,
f"CREATE TABLE test_partitioned_write ({table_format}) Engine = AzureBlobStorage('http://azurite1:10000/devstoreaccount1', 'cont', '{filename}', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV') PARTITION BY {partition_by}",
f"CREATE TABLE test_partitioned_write ({table_format}) Engine = AzureBlobStorage('{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}',"
f" 'cont', '{filename}', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV') "
f"PARTITION BY {partition_by}",
)
azure_query(node, f"INSERT INTO test_partitioned_write VALUES {values}")
assert "1,2,3\n" == get_azure_file_content("test_3.csv")
assert "3,2,1\n" == get_azure_file_content("test_1.csv")
assert "78,43,45\n" == get_azure_file_content("test_45.csv")
assert "1,2,3\n" == get_azure_file_content("test_3.csv", port)
assert "3,2,1\n" == get_azure_file_content("test_1.csv", port)
assert "78,43,45\n" == get_azure_file_content("test_45.csv", port)
def test_partition_by_string_column(cluster):
@ -200,15 +228,18 @@ def test_partition_by_string_column(cluster):
partition_by = "col_str"
values = "(1, 'foo/bar'), (3, 'йцук'), (78, '你好')"
filename = "test_{_partition_id}.csv"
port = cluster.env_variables["AZURITE_PORT"]
azure_query(
node,
f"CREATE TABLE test_partitioned_string_write ({table_format}) Engine = AzureBlobStorage('http://azurite1:10000/devstoreaccount1', 'cont', '{filename}', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV') PARTITION BY {partition_by}",
f"CREATE TABLE test_partitioned_string_write ({table_format}) Engine = AzureBlobStorage('{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}',"
f" 'cont', '{filename}', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV') "
f"PARTITION BY {partition_by}",
)
azure_query(node, f"INSERT INTO test_partitioned_string_write VALUES {values}")
assert '1,"foo/bar"\n' == get_azure_file_content("test_foo/bar.csv")
assert '3,"йцук"\n' == get_azure_file_content("test_йцук.csv")
assert '78,"你好"\n' == get_azure_file_content("test_你好.csv")
assert '1,"foo/bar"\n' == get_azure_file_content("test_foo/bar.csv", port)
assert '3,"йцук"\n' == get_azure_file_content("test_йцук.csv", port)
assert '78,"你好"\n' == get_azure_file_content("test_你好.csv", port)
def test_partition_by_const_column(cluster):
@ -218,46 +249,54 @@ def test_partition_by_const_column(cluster):
partition_by = "'88'"
values_csv = "1,2,3\n3,2,1\n78,43,45\n"
filename = "test_{_partition_id}.csv"
port = cluster.env_variables["AZURITE_PORT"]
azure_query(
node,
f"CREATE TABLE test_partitioned_const_write ({table_format}) Engine = AzureBlobStorage('http://azurite1:10000/devstoreaccount1', 'cont', '{filename}', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV') PARTITION BY {partition_by}",
f"CREATE TABLE test_partitioned_const_write ({table_format}) Engine = AzureBlobStorage('{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}',"
f" 'cont', '{filename}', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV')"
f" PARTITION BY {partition_by}",
)
azure_query(node, f"INSERT INTO test_partitioned_const_write VALUES {values}")
assert values_csv == get_azure_file_content("test_88.csv")
assert values_csv == get_azure_file_content("test_88.csv", port)
def test_truncate(cluster):
node = cluster.instances["node"]
port = cluster.env_variables["AZURITE_PORT"]
azure_query(
node,
"CREATE TABLE test_truncate (key UInt64, data String) Engine = AzureBlobStorage(azure_conf2, container='cont', blob_path='test_truncate.csv', format='CSV')",
f"CREATE TABLE test_truncate (key UInt64, data String) Engine = AzureBlobStorage(azure_conf2, "
f"storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', container='cont', blob_path='test_truncate.csv', format='CSV')",
)
azure_query(node, "INSERT INTO test_truncate VALUES (1, 'a')")
assert get_azure_file_content("test_truncate.csv") == '1,"a"\n'
assert get_azure_file_content("test_truncate.csv", port) == '1,"a"\n'
azure_query(node, "TRUNCATE TABLE test_truncate")
with pytest.raises(Exception):
print(get_azure_file_content("test_truncate.csv"))
print(get_azure_file_content("test_truncate.csv", port))
def test_simple_read_write(cluster):
node = cluster.instances["node"]
port = cluster.env_variables["AZURITE_PORT"]
azure_query(
node,
"CREATE TABLE test_simple_read_write (key UInt64, data String) Engine = AzureBlobStorage(azure_conf2, container='cont', blob_path='test_simple_read_write.csv', format='CSV')",
f"CREATE TABLE test_simple_read_write (key UInt64, data String) Engine = AzureBlobStorage(azure_conf2, "
f"storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', container='cont', blob_path='test_simple_read_write.csv', "
f"format='CSV')",
)
azure_query(node, "INSERT INTO test_simple_read_write VALUES (1, 'a')")
assert get_azure_file_content("test_simple_read_write.csv") == '1,"a"\n'
assert get_azure_file_content("test_simple_read_write.csv", port) == '1,"a"\n'
print(azure_query(node, "SELECT * FROM test_simple_read_write"))
assert azure_query(node, "SELECT * FROM test_simple_read_write") == "1\ta\n"
def test_create_new_files_on_insert(cluster):
node = cluster.instances["node"]
azure_query(
node,
f"create table test_multiple_inserts(a Int32, b String) ENGINE = AzureBlobStorage(azure_conf2, container='cont', blob_path='test_parquet', format='Parquet')",
f"create table test_multiple_inserts(a Int32, b String) ENGINE = AzureBlobStorage(azure_conf2, "
f"storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', container='cont', blob_path='test_parquet', format='Parquet')",
)
azure_query(node, "truncate table test_multiple_inserts")
azure_query(
@ -281,10 +320,10 @@ def test_create_new_files_on_insert(cluster):
def test_overwrite(cluster):
node = cluster.instances["node"]
azure_query(
node,
f"create table test_overwrite(a Int32, b String) ENGINE = AzureBlobStorage(azure_conf2, container='cont', blob_path='test_parquet_overwrite', format='Parquet')",
f"create table test_overwrite(a Int32, b String) ENGINE = AzureBlobStorage(azure_conf2, "
f"storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', container='cont', blob_path='test_parquet_overwrite', format='Parquet')",
)
azure_query(node, "truncate table test_overwrite")
@ -308,7 +347,8 @@ def test_insert_with_path_with_globs(cluster):
node = cluster.instances["node"]
azure_query(
node,
f"create table test_insert_globs(a Int32, b String) ENGINE = AzureBlobStorage(azure_conf2, container='cont', blob_path='test_insert_with_globs*', format='Parquet')",
f"create table test_insert_globs(a Int32, b String) ENGINE = AzureBlobStorage(azure_conf2, "
f"storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', container='cont', blob_path='test_insert_with_globs*', format='Parquet')",
)
node.query_and_get_error(
f"insert into table function test_insert_globs SELECT number, randomString(100) FROM numbers(500)"
@ -331,7 +371,8 @@ def test_put_get_with_globs(cluster):
azure_query(
node,
f"CREATE TABLE test_put_{i}_{j} ({table_format}) Engine = AzureBlobStorage(azure_conf2, container='cont', blob_path='{path}', format='CSV')",
f"CREATE TABLE test_put_{i}_{j} ({table_format}) Engine = AzureBlobStorage(azure_conf2, "
f"storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', container='cont', blob_path='{path}', format='CSV')",
)
query = f"insert into test_put_{i}_{j} VALUES {values}"
@ -339,7 +380,8 @@ def test_put_get_with_globs(cluster):
azure_query(
node,
f"CREATE TABLE test_glob_select ({table_format}) Engine = AzureBlobStorage(azure_conf2, container='cont', blob_path='{unique_prefix}/*_{{a,b,c,d}}/?.csv', format='CSV')",
f"CREATE TABLE test_glob_select ({table_format}) Engine = AzureBlobStorage(azure_conf2, "
f"storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', container='cont', blob_path='{unique_prefix}/*_{{a,b,c,d}}/?.csv', format='CSV')",
)
query = "select sum(column1), sum(column2), sum(column3), min(_file), max(_path) from test_glob_select"
assert azure_query(node, query).splitlines() == [
@ -363,7 +405,8 @@ def test_azure_glob_scheherazade(cluster):
unique_num = random.randint(1, 10000)
azure_query(
node,
f"CREATE TABLE test_scheherazade_{i}_{unique_num} ({table_format}) Engine = AzureBlobStorage(azure_conf2, container='cont', blob_path='{path}', format='CSV')",
f"CREATE TABLE test_scheherazade_{i}_{unique_num} ({table_format}) Engine = AzureBlobStorage(azure_conf2, "
f"storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', container='cont', blob_path='{path}', format='CSV')",
)
query = (
f"insert into test_scheherazade_{i}_{unique_num} VALUES {values}"
@ -382,7 +425,8 @@ def test_azure_glob_scheherazade(cluster):
azure_query(
node,
f"CREATE TABLE test_glob_select_scheherazade ({table_format}) Engine = AzureBlobStorage(azure_conf2, container='cont', blob_path='night_*/tale.csv', format='CSV')",
f"CREATE TABLE test_glob_select_scheherazade ({table_format}) Engine = AzureBlobStorage(azure_conf2, "
f"storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', container='cont', blob_path='night_*/tale.csv', format='CSV')",
)
query = "select count(), sum(column1), sum(column2), sum(column3) from test_glob_select_scheherazade"
assert azure_query(node, query).splitlines() == ["1001\t1001\t1001\t1001"]
@ -394,6 +438,7 @@ def test_azure_glob_scheherazade(cluster):
)
def test_storage_azure_get_gzip(cluster, extension, method):
node = cluster.instances["node"]
port = cluster.env_variables["AZURITE_PORT"]
filename = f"test_get_gzip.{extension}"
name = f"test_get_gzip_{extension}"
data = [
@ -420,14 +465,13 @@ def test_storage_azure_get_gzip(cluster, extension, method):
compressed = gzip.GzipFile(fileobj=buf, mode="wb")
compressed.write(("\n".join(data)).encode())
compressed.close()
put_azure_file_content(filename, buf.getvalue())
put_azure_file_content(filename, port, buf.getvalue())
azure_query(
node,
f"""CREATE TABLE {name} (name String, id UInt32) ENGINE = AzureBlobStorage(
azure_conf2, container='cont', blob_path ='{filename}',
format='CSV',
compression='{method}')""",
f"CREATE TABLE {name} (name String, id UInt32) ENGINE = AzureBlobStorage( azure_conf2,"
f" storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', container='cont', blob_path ='{filename}',"
f"format='CSV', compression='{method}')",
)
assert azure_query(node, f"SELECT sum(id) FROM {name}").splitlines() == ["565"]
@ -439,7 +483,9 @@ def test_schema_inference_no_globs(cluster):
table_format = "column1 UInt32, column2 String, column3 UInt32"
azure_query(
node,
f"CREATE TABLE test_schema_inference_src ({table_format}) Engine = AzureBlobStorage(azure_conf2, container='cont', blob_path='test_schema_inference_no_globs.csv', format='CSVWithNames')",
f"CREATE TABLE test_schema_inference_src ({table_format}) Engine = AzureBlobStorage(azure_conf2, "
f"storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', container='cont', "
f"blob_path='test_schema_inference_no_globs.csv', format='CSVWithNames')",
)
query = f"insert into test_schema_inference_src SELECT number, toString(number), number * number FROM numbers(1000)"
@ -447,7 +493,8 @@ def test_schema_inference_no_globs(cluster):
azure_query(
node,
f"CREATE TABLE test_select_inference Engine = AzureBlobStorage(azure_conf2, container='cont', blob_path='test_schema_inference_no_globs.csv')",
f"CREATE TABLE test_select_inference Engine = AzureBlobStorage(azure_conf2, "
f"storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', container='cont', blob_path='test_schema_inference_no_globs.csv')",
)
print(node.query("SHOW CREATE TABLE test_select_inference"))
@ -474,7 +521,9 @@ def test_schema_inference_from_globs(cluster):
azure_query(
node,
f"CREATE TABLE test_schema_{i}_{j} ({table_format}) Engine = AzureBlobStorage(azure_conf2, container='cont', blob_path='{path}', format='CSVWithNames')",
f"CREATE TABLE test_schema_{i}_{j} ({table_format}) Engine = AzureBlobStorage(azure_conf2, "
f"storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', container='cont', "
f"blob_path='{path}', format='CSVWithNames')",
)
query = f"insert into test_schema_{i}_{j} VALUES {values}"
@ -482,7 +531,8 @@ def test_schema_inference_from_globs(cluster):
azure_query(
node,
f"CREATE TABLE test_glob_select_inference Engine = AzureBlobStorage(azure_conf2, container='cont', blob_path='{unique_prefix}/*_{{a,b,c,d}}/?.csv')",
f"CREATE TABLE test_glob_select_inference Engine = AzureBlobStorage(azure_conf2, "
f"storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', container='cont', blob_path='{unique_prefix}/*_{{a,b,c,d}}/?.csv')",
)
print(node.query("SHOW CREATE TABLE test_glob_select_inference"))
@ -497,36 +547,47 @@ def test_schema_inference_from_globs(cluster):
def test_simple_write_account_string_table_function(cluster):
node = cluster.instances["node"]
port = cluster.env_variables["AZURITE_PORT"]
azure_query(
node,
"INSERT INTO TABLE FUNCTION azureBlobStorage('http://azurite1:10000/devstoreaccount1', 'cont', 'test_simple_write_tf.csv', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV', 'auto', 'key UInt64, data String') VALUES (1, 'a')",
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', "
f"'cont', 'test_simple_write_tf.csv', 'devstoreaccount1', "
f"'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV', 'auto', 'key UInt64, data String')"
f" VALUES (1, 'a')",
)
print(get_azure_file_content("test_simple_write_tf.csv"))
assert get_azure_file_content("test_simple_write_tf.csv") == '1,"a"\n'
print(get_azure_file_content("test_simple_write_tf.csv", port))
assert get_azure_file_content("test_simple_write_tf.csv", port) == '1,"a"\n'
def test_simple_write_connection_string_table_function(cluster):
node = cluster.instances["node"]
port = cluster.env_variables["AZURITE_PORT"]
azure_query(
node,
"INSERT INTO TABLE FUNCTION azureBlobStorage('DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://azurite1:10000/devstoreaccount1;', 'cont', 'test_simple_write_connection_tf.csv', 'CSV', 'auto', 'key UInt64, data String') VALUES (1, 'a')",
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{cluster.env_variables['AZURITE_CONNECTION_STRING']}', "
f"'cont', 'test_simple_write_connection_tf.csv', 'CSV', 'auto', 'key UInt64, data String') VALUES (1, 'a')",
)
print(get_azure_file_content("test_simple_write_connection_tf.csv", port))
assert (
get_azure_file_content("test_simple_write_connection_tf.csv", port) == '1,"a"\n'
)
print(get_azure_file_content("test_simple_write_connection_tf.csv"))
assert get_azure_file_content("test_simple_write_connection_tf.csv") == '1,"a"\n'
def test_simple_write_named_collection_1_table_function(cluster):
node = cluster.instances["node"]
port = cluster.env_variables["AZURITE_PORT"]
azure_query(
node,
"INSERT INTO TABLE FUNCTION azureBlobStorage(azure_conf1) VALUES (1, 'a')",
f"INSERT INTO TABLE FUNCTION azureBlobStorage(azure_conf1, "
f"connection_string = '{cluster.env_variables['AZURITE_CONNECTION_STRING']}') VALUES (1, 'a')",
)
print(get_azure_file_content("test_simple_write_named.csv"))
assert get_azure_file_content("test_simple_write_named.csv") == '1,"a"\n'
print(get_azure_file_content("test_simple_write_named.csv", port))
assert get_azure_file_content("test_simple_write_named.csv", port) == '1,"a"\n'
azure_query(
node,
"CREATE TABLE drop_table (key UInt64, data String) Engine = AzureBlobStorage(azure_conf1)",
f"CREATE TABLE drop_table (key UInt64, data String) Engine = AzureBlobStorage(azure_conf1, "
f"connection_string = '{cluster.env_variables['AZURITE_CONNECTION_STRING']};')",
)
azure_query(
@ -537,13 +598,14 @@ def test_simple_write_named_collection_1_table_function(cluster):
def test_simple_write_named_collection_2_table_function(cluster):
node = cluster.instances["node"]
port = cluster.env_variables["AZURITE_PORT"]
azure_query(
node,
"INSERT INTO TABLE FUNCTION azureBlobStorage(azure_conf2, container='cont', blob_path='test_simple_write_named_2_tf.csv', format='CSV', structure='key UInt64, data String') VALUES (1, 'a')",
f"INSERT INTO TABLE FUNCTION azureBlobStorage(azure_conf2, storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}',"
f" container='cont', blob_path='test_simple_write_named_2_tf.csv', format='CSV', structure='key UInt64, data String') VALUES (1, 'a')",
)
print(get_azure_file_content("test_simple_write_named_2_tf.csv"))
assert get_azure_file_content("test_simple_write_named_2_tf.csv") == '1,"a"\n'
print(get_azure_file_content("test_simple_write_named_2_tf.csv", port))
assert get_azure_file_content("test_simple_write_named_2_tf.csv", port) == '1,"a"\n'
def test_put_get_with_globs_tf(cluster):
@ -562,9 +624,14 @@ def test_put_get_with_globs_tf(cluster):
azure_query(
node,
f"INSERT INTO TABLE FUNCTION azureBlobStorage(azure_conf2, container='cont', blob_path='{path}', format='CSV', compression='auto', structure='{table_format}') VALUES {values}",
f"INSERT INTO TABLE FUNCTION azureBlobStorage(azure_conf2, storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}',"
f" container='cont', blob_path='{path}', format='CSV', compression='auto', structure='{table_format}') VALUES {values}",
)
query = (
f"select sum(column1), sum(column2), sum(column3), min(_file), max(_path) from azureBlobStorage(azure_conf2, "
f"storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', container='cont', "
f"blob_path='{unique_prefix}/*_{{a,b,c,d}}/?.csv', format='CSV', structure='{table_format}')"
)
query = f"select sum(column1), sum(column2), sum(column3), min(_file), max(_path) from azureBlobStorage(azure_conf2, container='cont', blob_path='{unique_prefix}/*_{{a,b,c,d}}/?.csv', format='CSV', structure='{table_format}')"
assert azure_query(node, query).splitlines() == [
"450\t450\t900\t0.csv\t{bucket}/{max_path}".format(
bucket="cont", max_path=max_path
@ -576,10 +643,18 @@ def test_schema_inference_no_globs_tf(cluster):
node = cluster.instances["node"] # type: ClickHouseInstance
table_format = "column1 UInt32, column2 String, column3 UInt32"
query = f"insert into table function azureBlobStorage(azure_conf2, container='cont', blob_path='test_schema_inference_no_globs_tf.csv', format='CSVWithNames', structure='{table_format}') SELECT number, toString(number), number * number FROM numbers(1000)"
query = (
f"insert into table function azureBlobStorage(azure_conf2, storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', "
f"container='cont', blob_path='test_schema_inference_no_globs_tf.csv', format='CSVWithNames', structure='{table_format}') "
f"SELECT number, toString(number), number * number FROM numbers(1000)"
)
azure_query(node, query)
query = "select sum(column1), sum(length(column2)), sum(column3), min(_file), max(_path) from azureBlobStorage(azure_conf2, container='cont', blob_path='test_schema_inference_no_globs_tf.csv')"
query = (
f"select sum(column1), sum(length(column2)), sum(column3), min(_file), max(_path) from azureBlobStorage(azure_conf2, "
f"storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', container='cont', "
f"blob_path='test_schema_inference_no_globs_tf.csv')"
)
assert azure_query(node, query).splitlines() == [
"499500\t2890\t332833500\ttest_schema_inference_no_globs_tf.csv\tcont/test_schema_inference_no_globs_tf.csv"
]
@ -600,10 +675,17 @@ def test_schema_inference_from_globs_tf(cluster):
max_path = max(path, max_path)
values = f"({i},{j},{i + j})"
query = f"insert into table function azureBlobStorage(azure_conf2, container='cont', blob_path='{path}', format='CSVWithNames', structure='{table_format}') VALUES {values}"
query = (
f"insert into table function azureBlobStorage(azure_conf2, storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', "
f"container='cont', blob_path='{path}', format='CSVWithNames', structure='{table_format}') VALUES {values}"
)
azure_query(node, query)
query = f"select sum(column1), sum(column2), sum(column3), min(_file), max(_path) from azureBlobStorage(azure_conf2, container='cont', blob_path='{unique_prefix}/*_{{a,b,c,d}}/?.csv')"
query = (
f"select sum(column1), sum(column2), sum(column3), min(_file), max(_path) from azureBlobStorage(azure_conf2, "
f"storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', container='cont', "
f"blob_path='{unique_prefix}/*_{{a,b,c,d}}/?.csv')"
)
assert azure_query(node, query).splitlines() == [
"450\t450\t900\t0.csv\t{bucket}/{max_path}".format(
bucket="cont", max_path=max_path
@ -617,15 +699,18 @@ def test_partition_by_tf(cluster):
partition_by = "column3"
values = "(1, 2, 3), (3, 2, 1), (78, 43, 45)"
filename = "test_partition_tf_{_partition_id}.csv"
port = cluster.env_variables["AZURITE_PORT"]
azure_query(
node,
f"INSERT INTO TABLE FUNCTION azureBlobStorage('http://azurite1:10000/devstoreaccount1', 'cont', '{filename}', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV', 'auto', '{table_format}') PARTITION BY {partition_by} VALUES {values}",
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', "
f"'cont', '{filename}', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', "
f"'CSV', 'auto', '{table_format}') PARTITION BY {partition_by} VALUES {values}",
)
assert "1,2,3\n" == get_azure_file_content("test_partition_tf_3.csv")
assert "3,2,1\n" == get_azure_file_content("test_partition_tf_1.csv")
assert "78,43,45\n" == get_azure_file_content("test_partition_tf_45.csv")
assert "1,2,3\n" == get_azure_file_content("test_partition_tf_3.csv", port)
assert "3,2,1\n" == get_azure_file_content("test_partition_tf_1.csv", port)
assert "78,43,45\n" == get_azure_file_content("test_partition_tf_45.csv", port)
def test_filter_using_file(cluster):
@ -637,45 +722,64 @@ def test_filter_using_file(cluster):
azure_query(
node,
f"INSERT INTO TABLE FUNCTION azureBlobStorage('http://azurite1:10000/devstoreaccount1', 'cont', '{filename}', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV', 'auto', '{table_format}') PARTITION BY {partition_by} VALUES {values}",
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', 'cont', '{filename}', "
f"'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV', 'auto', "
f"'{table_format}') PARTITION BY {partition_by} VALUES {values}",
)
query = f"select count(*) from azureBlobStorage('http://azurite1:10000/devstoreaccount1', 'cont', 'test_partition_tf_*.csv', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV', 'auto', '{table_format}') WHERE _file='test_partition_tf_3.csv'"
query = (
f"select count(*) from azureBlobStorage('{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', 'cont', 'test_partition_tf_*.csv', "
f"'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV', 'auto', "
f"'{table_format}') WHERE _file='test_partition_tf_3.csv'"
)
assert azure_query(node, query) == "1\n"
def test_read_subcolumns(cluster):
node = cluster.instances["node"]
storage_account_url = cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]
azure_query(
node,
"INSERT INTO TABLE FUNCTION azureBlobStorage('http://azurite1:10000/devstoreaccount1', 'cont', 'test_subcolumns.tsv', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', 'auto', 'a Tuple(b Tuple(c UInt32, d UInt32), e UInt32)') select ((1, 2), 3)",
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}', 'cont', 'test_subcolumns.tsv', "
f"'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', 'auto',"
f" 'a Tuple(b Tuple(c UInt32, d UInt32), e UInt32)') select ((1, 2), 3)",
)
azure_query(
node,
"INSERT INTO TABLE FUNCTION azureBlobStorage('http://azurite1:10000/devstoreaccount1', 'cont', 'test_subcolumns.jsonl', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', 'auto', 'a Tuple(b Tuple(c UInt32, d UInt32), e UInt32)') select ((1, 2), 3)",
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}', 'cont', 'test_subcolumns.jsonl', "
f"'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', 'auto', "
f"'a Tuple(b Tuple(c UInt32, d UInt32), e UInt32)') select ((1, 2), 3)",
)
res = node.query(
f"select a.b.d, _path, a.b, _file, a.e from azureBlobStorage('http://azurite1:10000/devstoreaccount1', 'cont', 'test_subcolumns.tsv', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', 'auto', 'a Tuple(b Tuple(c UInt32, d UInt32), e UInt32)')"
f"select a.b.d, _path, a.b, _file, a.e from azureBlobStorage('{storage_account_url}', 'cont', 'test_subcolumns.tsv',"
f" 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', 'auto',"
f" 'a Tuple(b Tuple(c UInt32, d UInt32), e UInt32)')"
)
assert res == "2\tcont/test_subcolumns.tsv\t(1,2)\ttest_subcolumns.tsv\t3\n"
res = node.query(
f"select a.b.d, _path, a.b, _file, a.e from azureBlobStorage('http://azurite1:10000/devstoreaccount1', 'cont', 'test_subcolumns.jsonl', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', 'auto', 'a Tuple(b Tuple(c UInt32, d UInt32), e UInt32)')"
f"select a.b.d, _path, a.b, _file, a.e from azureBlobStorage('{storage_account_url}', 'cont', 'test_subcolumns.jsonl',"
f" 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', 'auto', "
f"'a Tuple(b Tuple(c UInt32, d UInt32), e UInt32)')"
)
assert res == "2\tcont/test_subcolumns.jsonl\t(1,2)\ttest_subcolumns.jsonl\t3\n"
res = node.query(
f"select x.b.d, _path, x.b, _file, x.e from azureBlobStorage('http://azurite1:10000/devstoreaccount1', 'cont', 'test_subcolumns.jsonl', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', 'auto', 'x Tuple(b Tuple(c UInt32, d UInt32), e UInt32)')"
f"select x.b.d, _path, x.b, _file, x.e from azureBlobStorage('{storage_account_url}', 'cont', 'test_subcolumns.jsonl',"
f" 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', 'auto', "
f"'x Tuple(b Tuple(c UInt32, d UInt32), e UInt32)')"
)
assert res == "0\tcont/test_subcolumns.jsonl\t(0,0)\ttest_subcolumns.jsonl\t0\n"
res = node.query(
f"select x.b.d, _path, x.b, _file, x.e from azureBlobStorage('http://azurite1:10000/devstoreaccount1', 'cont', 'test_subcolumns.jsonl', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', 'auto', 'x Tuple(b Tuple(c UInt32, d UInt32), e UInt32) default ((42, 42), 42)')"
f"select x.b.d, _path, x.b, _file, x.e from azureBlobStorage('{storage_account_url}', 'cont', 'test_subcolumns.jsonl',"
f" 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', 'auto', "
f"'x Tuple(b Tuple(c UInt32, d UInt32), e UInt32) default ((42, 42), 42)')"
)
assert res == "42\tcont/test_subcolumns.jsonl\t(42,42)\ttest_subcolumns.jsonl\t42\n"
@ -683,15 +787,18 @@ def test_read_subcolumns(cluster):
def test_read_from_not_existing_container(cluster):
node = cluster.instances["node"]
query = f"select * from azureBlobStorage('http://azurite1:10000/devstoreaccount1', 'cont_not_exists', 'test_table.csv', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV', 'auto')"
query = (
f"select * from azureBlobStorage('{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', 'cont_not_exists', 'test_table.csv', "
f"'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV', 'auto')"
)
expected_err_msg = "container does not exist"
assert expected_err_msg in azure_query(node, query, expect_error="true")
def test_function_signatures(cluster):
node = cluster.instances["node"]
connection_string = "DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://azurite1:10000/devstoreaccount1;"
storage_account_url = "http://azurite1:10000/devstoreaccount1"
connection_string = cluster.env_variables["AZURITE_CONNECTION_STRING"]
storage_account_url = cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]
account_name = "devstoreaccount1"
account_key = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="
azure_query(
@ -745,7 +852,8 @@ def check_profile_event_for_query(instance, file, profile_event, amount):
query_pattern = f"azureBlobStorage%{file}".replace("'", "\\'")
res = int(
instance.query(
f"select ProfileEvents['{profile_event}'] from system.query_log where query like '%{query_pattern}%' and query not like '%ProfileEvents%' and type = 'QueryFinish' order by query_start_time_microseconds desc limit 1"
f"select ProfileEvents['{profile_event}'] from system.query_log where query like '%{query_pattern}%' and query not like '%ProfileEvents%' "
f"and type = 'QueryFinish' order by query_start_time_microseconds desc limit 1"
)
)
@ -804,15 +912,16 @@ def check_cache(instance, expected_files):
def test_schema_inference_cache(cluster):
node = cluster.instances["node"]
connection_string = "DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://azurite1:10000/devstoreaccount1;"
storage_account_url = "http://azurite1:10000/devstoreaccount1"
connection_string = cluster.env_variables["AZURITE_CONNECTION_STRING"]
storage_account_url = cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]
account_name = "devstoreaccount1"
account_key = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="
node.query("system drop schema cache")
azure_query(
node,
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}', 'cont', 'test_cache0.jsonl', '{account_name}', '{account_key}') select * from numbers(100)",
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}', 'cont', 'test_cache0.jsonl', '{account_name}', '{account_key}') "
f"select * from numbers(100)",
)
time.sleep(1)
@ -826,7 +935,8 @@ def test_schema_inference_cache(cluster):
azure_query(
node,
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}', 'cont', 'test_cache0.jsonl', '{account_name}', '{account_key}') select * from numbers(100) settings azure_truncate_on_insert=1",
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}', 'cont', 'test_cache0.jsonl', '{account_name}', '{account_key}') "
f"select * from numbers(100) settings azure_truncate_on_insert=1",
)
time.sleep(1)
@ -836,7 +946,8 @@ def test_schema_inference_cache(cluster):
azure_query(
node,
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}', 'cont', 'test_cache1.jsonl', '{account_name}', '{account_key}') select * from numbers(100) settings azure_truncate_on_insert=1",
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}', 'cont', 'test_cache1.jsonl', '{account_name}', '{account_key}') "
f"select * from numbers(100) settings azure_truncate_on_insert=1",
)
time.sleep(1)
@ -849,7 +960,8 @@ def test_schema_inference_cache(cluster):
azure_query(
node,
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}', 'cont', 'test_cache2.jsonl', '{account_name}', '{account_key}') select * from numbers(100) settings azure_truncate_on_insert=1",
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}', 'cont', 'test_cache2.jsonl', '{account_name}', '{account_key}') "
f"select * from numbers(100) settings azure_truncate_on_insert=1",
)
time.sleep(1)
@ -895,7 +1007,8 @@ def test_schema_inference_cache(cluster):
azure_query(
node,
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}', 'cont', 'test_cache3.jsonl', '{account_name}', '{account_key}') select * from numbers(100) settings azure_truncate_on_insert=1",
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}', 'cont', 'test_cache3.jsonl', '{account_name}', '{account_key}') "
f"select * from numbers(100) settings azure_truncate_on_insert=1",
)
time.sleep(1)
@ -919,7 +1032,8 @@ def test_schema_inference_cache(cluster):
azure_query(
node,
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}', 'cont', 'test_cache0.csv', '{account_name}', '{account_key}') select * from numbers(100) settings azure_truncate_on_insert=1",
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}', 'cont', 'test_cache0.csv', '{account_name}', '{account_key}') "
f"select * from numbers(100) settings azure_truncate_on_insert=1",
)
time.sleep(1)
@ -943,7 +1057,8 @@ def test_schema_inference_cache(cluster):
azure_query(
node,
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}', 'cont', 'test_cache0.csv', '{account_name}', '{account_key}') select * from numbers(200) settings azure_truncate_on_insert=1",
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}', 'cont', 'test_cache0.csv', '{account_name}', '{account_key}') "
f"select * from numbers(200) settings azure_truncate_on_insert=1",
)
time.sleep(1)
@ -958,7 +1073,8 @@ def test_schema_inference_cache(cluster):
azure_query(
node,
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}', 'cont', 'test_cache1.csv', '{account_name}', '{account_key}') select * from numbers(100) settings azure_truncate_on_insert=1",
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}', 'cont', 'test_cache1.csv', '{account_name}', '{account_key}') "
f"select * from numbers(100) settings azure_truncate_on_insert=1",
)
time.sleep(1)
@ -991,7 +1107,8 @@ def test_schema_inference_cache(cluster):
azure_query(
node,
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}', 'cont', 'test_cache.parquet', '{account_name}', '{account_key}') select * from numbers(100) settings azure_truncate_on_insert=1",
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}', 'cont', 'test_cache.parquet', '{account_name}', '{account_key}') "
f"select * from numbers(100) settings azure_truncate_on_insert=1",
)
time.sleep(1)
@ -1007,23 +1124,29 @@ def test_schema_inference_cache(cluster):
def test_filtering_by_file_or_path(cluster):
node = cluster.instances["node"]
storage_account_url = cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]
azure_query(
node,
"INSERT INTO TABLE FUNCTION azureBlobStorage('http://azurite1:10000/devstoreaccount1', 'cont', 'test_filter1.tsv', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', 'auto', 'x UInt64') select 1",
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}','cont', 'test_filter1.tsv', 'devstoreaccount1', "
f"'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', 'auto', 'x UInt64') select 1",
)
azure_query(
node,
"INSERT INTO TABLE FUNCTION azureBlobStorage('http://azurite1:10000/devstoreaccount1', 'cont', 'test_filter2.tsv', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', 'auto', 'x UInt64') select 2",
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}','cont', 'test_filter2.tsv', 'devstoreaccount1', "
f"'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', 'auto', 'x UInt64') select 2",
)
azure_query(
node,
"INSERT INTO TABLE FUNCTION azureBlobStorage('http://azurite1:10000/devstoreaccount1', 'cont', 'test_filter3.tsv', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', 'auto', 'x UInt64') select 3",
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}', 'cont', 'test_filter3.tsv', 'devstoreaccount1', "
f"'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', 'auto', 'x UInt64') select 3",
)
node.query(
f"select count() from azureBlobStorage('http://azurite1:10000/devstoreaccount1', 'cont', 'test_filter*.tsv', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', 'auto', 'x UInt64') where _file = 'test_filter1.tsv'"
f"select count() from azureBlobStorage('{storage_account_url}', 'cont', 'test_filter*.tsv', 'devstoreaccount1', "
f"'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', 'auto', 'x UInt64') "
f"where _file = 'test_filter1.tsv'"
)
node.query("SYSTEM FLUSH LOGS")

View File

@ -49,9 +49,13 @@ def cluster():
cluster.shutdown()
def get_azure_file_content(filename):
def get_azure_file_content(filename, port):
container_name = "cont"
connection_string = "DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1;"
connection_string = (
f"DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;"
f"AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;"
f"BlobEndpoint=http://127.0.0.1:{port}/devstoreaccount1;"
)
blob_service_client = BlobServiceClient.from_connection_string(connection_string)
container_client = blob_service_client.get_container_client(container_name)
blob_client = container_client.get_blob_client(filename)
@ -61,31 +65,28 @@ def get_azure_file_content(filename):
def test_select_all(cluster):
node = cluster.instances["node_0"]
port = cluster.env_variables["AZURITE_PORT"]
storage_account_url = cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]
azure_query(
node,
"INSERT INTO TABLE FUNCTION azureBlobStorage("
"'http://azurite1:10000/devstoreaccount1', 'cont', 'test_cluster_select_all.csv', 'devstoreaccount1', "
"'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV', "
"'auto', 'key UInt64, data String') VALUES (1, 'a'), (2, 'b')",
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}', 'cont', 'test_cluster_select_all.csv', 'devstoreaccount1',"
f"'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV', 'auto', 'key UInt64, data String') "
f"VALUES (1, 'a'), (2, 'b')",
)
print(get_azure_file_content("test_cluster_select_all.csv"))
print(get_azure_file_content("test_cluster_select_all.csv", port))
pure_azure = azure_query(
node,
"""
SELECT * from azureBlobStorage(
'http://azurite1:10000/devstoreaccount1', 'cont', 'test_cluster_select_all.csv', 'devstoreaccount1',
'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV',
'auto')""",
f"SELECT * from azureBlobStorage('{storage_account_url}', 'cont', 'test_cluster_select_all.csv', 'devstoreaccount1',"
f"'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV','auto')",
)
print(pure_azure)
distributed_azure = azure_query(
node,
"""
SELECT * from azureBlobStorageCluster(
'simple_cluster', 'http://azurite1:10000/devstoreaccount1', 'cont', 'test_cluster_select_all.csv', 'devstoreaccount1',
'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV',
'auto')""",
f"SELECT * from azureBlobStorageCluster('simple_cluster', '{storage_account_url}', 'cont', 'test_cluster_select_all.csv', 'devstoreaccount1',"
f"'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV',"
f"'auto')"
"",
)
print(distributed_azure)
assert TSV(pure_azure) == TSV(distributed_azure)
@ -93,31 +94,28 @@ def test_select_all(cluster):
def test_count(cluster):
node = cluster.instances["node_0"]
port = cluster.env_variables["AZURITE_PORT"]
storage_account_url = cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]
azure_query(
node,
"INSERT INTO TABLE FUNCTION azureBlobStorage("
"'http://azurite1:10000/devstoreaccount1', 'cont', 'test_cluster_count.csv', 'devstoreaccount1', "
"'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV', "
"'auto', 'key UInt64') VALUES (1), (2)",
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}', 'cont', 'test_cluster_count.csv', 'devstoreaccount1', "
f"'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV', "
f"'auto', 'key UInt64') VALUES (1), (2)",
)
print(get_azure_file_content("test_cluster_count.csv"))
print(get_azure_file_content("test_cluster_count.csv", port))
pure_azure = azure_query(
node,
"""
SELECT count(*) from azureBlobStorage(
'http://azurite1:10000/devstoreaccount1', 'cont', 'test_cluster_count.csv', 'devstoreaccount1',
'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV',
'auto', 'key UInt64')""",
f"SELECT count(*) from azureBlobStorage('{storage_account_url}', 'cont', 'test_cluster_count.csv', 'devstoreaccount1',"
f"'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV',"
f"'auto', 'key UInt64')",
)
print(pure_azure)
distributed_azure = azure_query(
node,
"""
SELECT count(*) from azureBlobStorageCluster(
'simple_cluster', 'http://azurite1:10000/devstoreaccount1', 'cont', 'test_cluster_count.csv', 'devstoreaccount1',
'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV',
'auto', 'key UInt64')""",
f"SELECT count(*) from azureBlobStorageCluster('simple_cluster', '{storage_account_url}', 'cont', 'test_cluster_count.csv', "
f"'devstoreaccount1','Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV',"
f"'auto', 'key UInt64')",
)
print(distributed_azure)
assert TSV(pure_azure) == TSV(distributed_azure)
@ -125,26 +123,25 @@ def test_count(cluster):
def test_union_all(cluster):
node = cluster.instances["node_0"]
storage_account_url = cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]
azure_query(
node,
"INSERT INTO TABLE FUNCTION azureBlobStorage("
"'http://azurite1:10000/devstoreaccount1', 'cont', 'test_parquet_union_all', 'devstoreaccount1', "
"'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'Parquet', "
"'auto', 'a Int32, b String') VALUES (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd')",
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}', 'cont', 'test_parquet_union_all', 'devstoreaccount1', "
f"'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'Parquet', "
f"'auto', 'a Int32, b String') VALUES (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd')",
)
pure_azure = azure_query(
node,
"""
f"""
SELECT * FROM
(
SELECT * from azureBlobStorage(
'http://azurite1:10000/devstoreaccount1', 'cont', 'test_parquet_union_all', 'devstoreaccount1',
SELECT * from azureBlobStorage('{storage_account_url}', 'cont', 'test_parquet_union_all', 'devstoreaccount1',
'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'Parquet',
'auto', 'a Int32, b String')
UNION ALL
SELECT * from azureBlobStorage(
'http://azurite1:10000/devstoreaccount1', 'cont', 'test_parquet_union_all', 'devstoreaccount1',
'{storage_account_url}', 'cont', 'test_parquet_union_all', 'devstoreaccount1',
'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'Parquet',
'auto', 'a Int32, b String')
)
@ -153,18 +150,18 @@ def test_union_all(cluster):
)
azure_distributed = azure_query(
node,
"""
f"""
SELECT * FROM
(
SELECT * from azureBlobStorageCluster(
'simple_cluster',
'http://azurite1:10000/devstoreaccount1', 'cont', 'test_parquet_union_all', 'devstoreaccount1',
'{storage_account_url}', 'cont', 'test_parquet_union_all', 'devstoreaccount1',
'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'Parquet',
'auto', 'a Int32, b String')
UNION ALL
SELECT * from azureBlobStorageCluster(
'simple_cluster',
'http://azurite1:10000/devstoreaccount1', 'cont', 'test_parquet_union_all', 'devstoreaccount1',
'{storage_account_url}', 'cont', 'test_parquet_union_all', 'devstoreaccount1',
'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'Parquet',
'auto', 'a Int32, b String')
)
@ -177,22 +174,18 @@ def test_union_all(cluster):
def test_skip_unavailable_shards(cluster):
node = cluster.instances["node_0"]
storage_account_url = cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]
azure_query(
node,
"INSERT INTO TABLE FUNCTION azureBlobStorage("
"'http://azurite1:10000/devstoreaccount1', 'cont', 'test_skip_unavailable.csv', 'devstoreaccount1', "
"'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', "
"'auto', 'a UInt64') VALUES (1), (2)",
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}', 'cont', 'test_skip_unavailable.csv', 'devstoreaccount1', "
f"'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', "
f"'auto', 'a UInt64') VALUES (1), (2)",
)
result = azure_query(
node,
"""
SELECT count(*) from azureBlobStorageCluster(
'cluster_non_existent_port',
'http://azurite1:10000/devstoreaccount1', 'cont', 'test_skip_unavailable.csv', 'devstoreaccount1',
'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==')
SETTINGS skip_unavailable_shards = 1
""",
f"SELECT count(*) from azureBlobStorageCluster('cluster_non_existent_port','{storage_account_url}', 'cont', 'test_skip_unavailable.csv', "
f"'devstoreaccount1','Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==') "
f"SETTINGS skip_unavailable_shards = 1",
)
assert result == "2\n"
@ -201,21 +194,17 @@ def test_skip_unavailable_shards(cluster):
def test_unset_skip_unavailable_shards(cluster):
# Although skip_unavailable_shards is not set, cluster table functions should always skip unavailable shards.
node = cluster.instances["node_0"]
storage_account_url = cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]
azure_query(
node,
"INSERT INTO TABLE FUNCTION azureBlobStorage("
"'http://azurite1:10000/devstoreaccount1', 'cont', 'test_unset_skip_unavailable.csv', 'devstoreaccount1', "
"'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', "
"'auto', 'a UInt64') VALUES (1), (2)",
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}', 'cont', 'test_unset_skip_unavailable.csv', 'devstoreaccount1', "
f"'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', "
f"'auto', 'a UInt64') VALUES (1), (2)",
)
result = azure_query(
node,
"""
SELECT count(*) from azureBlobStorageCluster(
'cluster_non_existent_port',
'http://azurite1:10000/devstoreaccount1', 'cont', 'test_skip_unavailable.csv', 'devstoreaccount1',
'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==')
""",
f"SELECT count(*) from azureBlobStorageCluster('cluster_non_existent_port','{storage_account_url}', 'cont', 'test_skip_unavailable.csv', "
f"'devstoreaccount1','Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==')",
)
assert result == "2\n"
@ -223,58 +212,53 @@ def test_unset_skip_unavailable_shards(cluster):
def test_cluster_with_named_collection(cluster):
node = cluster.instances["node_0"]
storage_account_url = cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]
azure_query(
node,
"INSERT INTO TABLE FUNCTION azureBlobStorage("
"'http://azurite1:10000/devstoreaccount1', 'cont', 'test_cluster_with_named_collection.csv', 'devstoreaccount1', "
"'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', "
"'auto', 'a UInt64') VALUES (1), (2)",
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}', 'cont', 'test_cluster_with_named_collection.csv', 'devstoreaccount1', "
f"'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', "
f"'auto', 'a UInt64') VALUES (1), (2)",
)
pure_azure = azure_query(
node,
"""
SELECT * from azureBlobStorage(
'http://azurite1:10000/devstoreaccount1', 'cont', 'test_cluster_with_named_collection.csv', 'devstoreaccount1',
'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==')
""",
f"SELECT * from azureBlobStorage('{storage_account_url}', 'cont', 'test_cluster_with_named_collection.csv', 'devstoreaccount1',"
f"'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==')",
)
azure_cluster = azure_query(
node,
"""
SELECT * from azureBlobStorageCluster(
'simple_cluster', azure_conf2, container='cont', blob_path='test_cluster_with_named_collection.csv')
""",
f"SELECT * from azureBlobStorageCluster('simple_cluster', azure_conf2, storage_account_url = '{storage_account_url}', container='cont', "
f"blob_path='test_cluster_with_named_collection.csv')",
)
assert TSV(pure_azure) == TSV(azure_cluster)
def test_partition_parallel_readig_withcluster(cluster):
def test_partition_parallel_reading_with_cluster(cluster):
node = cluster.instances["node_0"]
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
partition_by = "column3"
values = "(1, 2, 3), (3, 2, 1), (78, 43, 45)"
filename = "test_tf_{_partition_id}.csv"
port = cluster.env_variables["AZURITE_PORT"]
storage_account_url = cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]
azure_query(
node,
f"INSERT INTO TABLE FUNCTION azureBlobStorage('http://azurite1:10000/devstoreaccount1', 'cont', '{filename}', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV', 'auto', '{table_format}') PARTITION BY {partition_by} VALUES {values}",
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}', 'cont', '{filename}', 'devstoreaccount1', "
f"'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV', 'auto', '{table_format}') "
f"PARTITION BY {partition_by} VALUES {values}",
)
assert "1,2,3\n" == get_azure_file_content("test_tf_3.csv")
assert "3,2,1\n" == get_azure_file_content("test_tf_1.csv")
assert "78,43,45\n" == get_azure_file_content("test_tf_45.csv")
assert "1,2,3\n" == get_azure_file_content("test_tf_3.csv", port)
assert "3,2,1\n" == get_azure_file_content("test_tf_1.csv", port)
assert "78,43,45\n" == get_azure_file_content("test_tf_45.csv", port)
azure_cluster = azure_query(
node,
"""
SELECT count(*) from azureBlobStorageCluster(
'simple_cluster',
azure_conf2, container='cont', blob_path='test_tf_*.csv', format='CSV', compression='auto', structure='column1 UInt32, column2 UInt32, column3 UInt32')
""",
f"SELECT count(*) from azureBlobStorageCluster('simple_cluster', azure_conf2, storage_account_url = '{storage_account_url}', "
f"container='cont', blob_path='test_tf_*.csv', format='CSV', compression='auto', structure='column1 UInt32, column2 UInt32, column3 UInt32')",
)
assert azure_cluster == "3\n"