Merge branch 'master' into more-strict-tryparse

This commit is contained in:
Alexey Milovidov 2023-10-28 18:07:25 +02:00
commit 18e238e7c6
26 changed files with 542 additions and 331 deletions

View File

@ -4,10 +4,10 @@ services:
azurite1:
image: mcr.microsoft.com/azure-storage/azurite
ports:
- "10000:10000"
- "${AZURITE_PORT}:${AZURITE_PORT}"
volumes:
- data1-1:/data1
command: azurite-blob --blobHost 0.0.0.0 --blobPort 10000 --debug /azurite_log
command: azurite-blob --blobHost 0.0.0.0 --blobPort ${AZURITE_PORT} --debug /azurite_log
volumes:
data1-1:

View File

@ -7,16 +7,16 @@ sidebar_label: "История ClickHouse"
# История ClickHouse {#istoriia-clickhouse}
ClickHouse изначально разрабатывался для обеспечения работы [Яндекс.Метрики](https://metrika.yandex.ru/), [второй крупнейшей в мире](http://w3techs.com/technologies/overview/traffic_analysis/all) платформы для веб аналитики, и продолжает быть её ключевым компонентом. При более 13 триллионах записей в базе данных и более 20 миллиардах событий в сутки, ClickHouse позволяет генерировать индивидуально настроенные отчёты на лету напрямую из неагрегированных данных. Данная статья вкратце демонстрирует какие цели исторически стояли перед ClickHouse на ранних этапах его развития.
ClickHouse изначально разрабатывался для обеспечения работы [Яндекс.Метрики](https://metrika.yandex.ru/) [второй крупнейшей в мире](http://w3techs.com/technologies/overview/traffic_analysis/all) платформы для веб-аналитики — и продолжает быть её ключевым компонентом. При более 13 триллионах записей в базе данных и более 20 миллиардах событий в сутки, ClickHouse позволяет генерировать индивидуально настроенные отчёты на лету напрямую из неагрегированных данных. Данная статья вкратце демонстрирует какие цели исторически стояли перед ClickHouse на ранних этапах его развития.
Яндекс.Метрика на лету строит индивидуальные отчёты на основе хитов и визитов, с периодом и произвольными сегментами, задаваемыми конечным пользователем. Часто требуется построение сложных агрегатов, например числа уникальных пользователей. Новые данные для построения отчета поступают в реальном времени.
На апрель 2014, в Яндекс.Метрику поступало около 12 миллиардов событий (показов страниц и кликов мыши) ежедневно. Все эти события должны быть сохранены для возможности строить произвольные отчёты. Один запрос может потребовать просканировать миллионы строк за время не более нескольких сотен миллисекунд, или сотни миллионов строк за время не более нескольких секунд.
На апрель 2014 года в Яндекс.Метрику поступало около 12 миллиардов событий (показов страниц и кликов мыши) ежедневно. Все эти события должны быть сохранены для возможности строить произвольные отчёты. Один запрос может потребовать просканировать миллионы строк за время не более нескольких сотен миллисекунд, или сотни миллионов строк за время не более нескольких секунд.
## Использование в Яндекс.Метрике и других отделах Яндекса {#ispolzovanie-v-iandeks-metrike-i-drugikh-otdelakh-iandeksa}
В Яндекс.Метрике ClickHouse используется для нескольких задач.
Основная задача - построение отчётов в режиме онлайн по неагрегированным данным. Для решения этой задачи используется кластер из 374 серверов, хранящий более 20,3 триллионов строк в базе данных. Объём сжатых данных, без учёта дублирования и репликации, составляет около 2 ПБ. Объём несжатых данных (в формате tsv) составил бы, приблизительно, 17 ПБ.
Основная задача построение отчётов в режиме онлайн по неагрегированным данным. Для решения этой задачи используется кластер из 374 серверов, хранящий более 20,3 триллионов строк в базе данных. Объём сжатых данных, без учёта дублирования и репликации, составляет около 2 ПБ. Объём несжатых данных (в формате tsv) составил бы, приблизительно, 17 ПБ.
Также ClickHouse используется:
@ -35,20 +35,20 @@ ClickHouse имеет более десятка инсталляций в дру
Но агрегированные данные являются очень ограниченным решением, по следующим причинам:
- вы должны заранее знать перечень отчётов, необходимых пользователю;
- то есть, пользователь не может построить произвольный отчёт;
- при агрегации по большому количеству ключей, объём данных не уменьшается и агрегация бесполезна;
- при большом количестве отчётов, получается слишком много вариантов агрегации (комбинаторный взрыв);
- то есть пользователь не может построить произвольный отчёт;
- при агрегации по большому количеству ключей объём данных не уменьшается и агрегация бесполезна;
- при большом количестве отчётов получается слишком много вариантов агрегации (комбинаторный взрыв);
- при агрегации по ключам высокой кардинальности (например, URL) объём данных уменьшается не сильно (менее чем в 2 раза);
- из-за этого, объём данных при агрегации может не уменьшиться, а вырасти;
- пользователи будут смотреть не все отчёты, которые мы для них посчитаем - то есть, большая часть вычислений бесполезна;
- возможно нарушение логической целостности данных для разных агрегаций;
- из-за этого объём данных при агрегации может не уменьшиться, а вырасти;
- пользователи будут смотреть не все отчёты, которые мы для них посчитаем — то есть большая часть вычислений бесполезна;
- возможно нарушение логической целостности данных для разных агрегаций.
Как видно, если ничего не агрегировать, и работать с неагрегированными данными, то это даже может уменьшить объём вычислений.
Как видно, если ничего не агрегировать и работать с неагрегированными данными, то это даже может уменьшить объём вычислений.
Впрочем, при агрегации, существенная часть работы выносится в оффлайне, и её можно делать сравнительно спокойно. Для сравнения, при онлайн вычислениях, вычисления надо делать так быстро, как это возможно, так как именно в момент вычислений пользователь ждёт результата.
Впрочем, при агрегации существенная часть работы ведётся в фоновом режиме и её можно делать сравнительно спокойно. А онлайн-вычисления надо делать так быстро, как это возможно, так как именно в момент вычислений пользователь ждёт результата.
В Яндекс.Метрике есть специализированная система для агрегированных данных - Metrage, на основе которой работает большинство отчётов.
Также в Яндекс.Метрике с 2009 года использовалась специализированная OLAP БД для неагрегированных данных - OLAPServer, на основе которой раньше работал конструктор отчётов.
В Яндекс.Метрике есть специализированная система для агрегированных данных Metrage, на основе которой работает большинство отчётов.
Также в Яндекс.Метрике с 2009 года использовалась специализированная OLAP БД для неагрегированных данных OLAPServer, на основе которой раньше работал конструктор отчётов.
OLAPServer хорошо подходил для неагрегированных данных, но содержал много ограничений, не позволяющих использовать его для всех отчётов так, как хочется: отсутствие поддержки типов данных (только числа), невозможность инкрементального обновления данных в реальном времени (только перезаписью данных за сутки). OLAPServer не является СУБД, а является специализированной БД.
Чтобы снять ограничения OLAPServer-а и решить задачу работы с неагрегированными данными для всех отчётов, разработана СУБД ClickHouse.
Чтобы снять ограничения OLAPServer и решить задачу работы с неагрегированными данными для всех отчётов была разработана СУБД ClickHouse.

View File

@ -50,6 +50,9 @@
#include <Disks/registerDisks.h>
#include <incbin.h>
/// A minimal file used when the keeper is run without installation
INCBIN(keeper_resource_embedded_xml, SOURCE_DIR "/programs/keeper/keeper_embedded.xml");
int mainEntryClickHouseKeeper(int argc, char ** argv)
{
@ -158,6 +161,8 @@ int Keeper::run()
void Keeper::initialize(Poco::Util::Application & self)
{
ConfigProcessor::registerEmbeddedConfig("keeper_config.xml", std::string_view(reinterpret_cast<const char *>(gkeeper_resource_embedded_xmlData), gkeeper_resource_embedded_xmlSize));
BaseDaemon::initialize(self);
logger().information("starting up");

View File

@ -406,6 +406,9 @@
-->
<mark_cache_size>5368709120</mark_cache_size>
<!-- For marks of secondary indices.
-->
<index_mark_cache_size>5368709120</index_mark_cache_size>
<!-- If you enable the `min_bytes_to_use_mmap_io` setting,
the data in MergeTree tables can be read with mmap to avoid copying from kernel to userspace.

View File

@ -606,7 +606,7 @@ namespace ErrorCodes
APPLY_FOR_ERROR_CODES(M)
#undef M
constexpr ErrorCode END = 3000;
constexpr ErrorCode END = 1002;
ErrorPairHolder values[END + 1]{};
struct ErrorCodesNames

View File

@ -73,11 +73,11 @@ static constexpr auto DEFAULT_MARK_CACHE_POLICY = "SLRU";
static constexpr auto DEFAULT_MARK_CACHE_MAX_SIZE = 5368_MiB;
static constexpr auto DEFAULT_MARK_CACHE_SIZE_RATIO = 0.5l;
static constexpr auto DEFAULT_INDEX_UNCOMPRESSED_CACHE_POLICY = "SLRU";
static constexpr auto DEFAULT_INDEX_UNCOMPRESSED_CACHE_MAX_SIZE = 0_MiB;
static constexpr auto DEFAULT_INDEX_UNCOMPRESSED_CACHE_SIZE_RATIO = 0.5l;
static constexpr auto DEFAULT_INDEX_UNCOMPRESSED_CACHE_MAX_SIZE = 0;
static constexpr auto DEFAULT_INDEX_UNCOMPRESSED_CACHE_SIZE_RATIO = 0.5;
static constexpr auto DEFAULT_INDEX_MARK_CACHE_POLICY = "SLRU";
static constexpr auto DEFAULT_INDEX_MARK_CACHE_MAX_SIZE = 0_MiB;
static constexpr auto DEFAULT_INDEX_MARK_CACHE_SIZE_RATIO = 0.5l;
static constexpr auto DEFAULT_INDEX_MARK_CACHE_MAX_SIZE = 5368_MiB;
static constexpr auto DEFAULT_INDEX_MARK_CACHE_SIZE_RATIO = 0.3;
static constexpr auto DEFAULT_MMAP_CACHE_MAX_SIZE = 1_KiB; /// chosen by rolling dice
static constexpr auto DEFAULT_COMPILED_EXPRESSION_CACHE_MAX_SIZE = 128_MiB;
static constexpr auto DEFAULT_COMPILED_EXPRESSION_CACHE_MAX_ENTRIES = 10'000;

View File

@ -1279,8 +1279,15 @@ public:
bool date_and_datetime = (which_left.idx != which_right.idx) && (which_left.isDate() || which_left.isDate32() || which_left.isDateTime() || which_left.isDateTime64())
&& (which_right.isDate() || which_right.isDate32() || which_right.isDateTime() || which_right.isDateTime64());
/// Interval data types can be compared only when having equal units.
bool left_is_interval = which_left.isInterval();
bool right_is_interval = which_right.isInterval();
bool types_equal = left_type->equals(*right_type);
ColumnPtr res;
if (left_is_num && right_is_num && !date_and_datetime)
if (left_is_num && right_is_num && !date_and_datetime
&& (!left_is_interval || !right_is_interval || types_equal))
{
if (!((res = executeNumLeftType<UInt8>(col_left_untyped, col_right_untyped))
|| (res = executeNumLeftType<UInt16>(col_left_untyped, col_right_untyped))
@ -1372,7 +1379,7 @@ public:
throw Exception(ErrorCodes::LOGICAL_ERROR, "Date related common types can only be UInt32/UInt64/Int32/Decimal");
return res;
}
else if (left_type->equals(*right_type))
else if (types_equal)
{
return executeGenericIdenticalTypes(col_left_untyped, col_right_untyped);
}

View File

@ -475,13 +475,12 @@ InterpreterSelectQuery::InterpreterSelectQuery(
/// Check support for FINAL for parallel replicas
bool is_query_with_final = isQueryWithFinal(query_info);
if (is_query_with_final && (!settings.parallel_replicas_custom_key.value.empty() || settings.allow_experimental_parallel_reading_from_replicas > 0))
if (is_query_with_final && settings.allow_experimental_parallel_reading_from_replicas > 0)
{
if (settings.allow_experimental_parallel_reading_from_replicas == 1)
{
LOG_DEBUG(log, "FINAL modifier is not supported with parallel replicas. Query will be executed without using them.");
context->setSetting("allow_experimental_parallel_reading_from_replicas", Field(0));
context->setSetting("parallel_replicas_custom_key", String{""});
}
else if (settings.allow_experimental_parallel_reading_from_replicas == 2)
{

View File

@ -89,7 +89,7 @@ ORDER BY index_type, expression, column_name, seq_in_index;)", database, table,
/// can be functional indexes.
/// Above SELECT tries to emulate that. Caveats:
/// 1. The primary key index sub-SELECT assumes the primary key expression is non-functional. Non-functional primary key indexes in
/// ClickHouse are possible but quiete obscure. In MySQL they are not possible at all.
/// ClickHouse are possible but quite obscure. In MySQL they are not possible at all.
/// 2. Related to 1.: Poor man's tuple parsing with splitByString() in the PK sub-SELECT messes up for functional primary key index
/// expressions where the comma is not only used as separator between tuple components, e.g. in 'col1 + 1, concat(col2, col3)'.
/// 3. The data skipping index sub-SELECT assumes the index expression is functional. 3rd party tools that expect MySQL semantics from
@ -106,4 +106,3 @@ BlockIO InterpreterShowIndexesQuery::execute()
}

View File

@ -67,13 +67,13 @@ void StorageSystemZooKeeperConnection::fillData(MutableColumns & res_columns, Co
UInt16 port = static_cast<UInt16>(Poco::NumberParser::parseUnsigned(host_port.substr(offset + 1)));
UInt32 uptime = zookeeper->getSessionUptime();
time_t time = timeInSeconds(std::chrono::system_clock::now()) - uptime;
time_t connected_time = time(nullptr) - uptime;
columns[0]->insert(name);
columns[1]->insert(host);
columns[2]->insert(port);
columns[3]->insert(index);
columns[4]->insert(time);
columns[4]->insert(connected_time);
columns[5]->insert(uptime);
columns[6]->insert(zookeeper->expired());
columns[7]->insert(0);

View File

@ -515,6 +515,7 @@ class ClickHouseCluster:
self.spark_session = None
self.with_azurite = False
self._azurite_port = 0
# available when with_hdfs == True
self.hdfs_host = "hdfs1"
@ -734,6 +735,13 @@ class ClickHouseCluster:
self._kerberized_kafka_port = get_free_port()
return self._kerberized_kafka_port
@property
def azurite_port(self):
if self._azurite_port:
return self._azurite_port
self._azurite_port = get_free_port()
return self._azurite_port
@property
def mongo_port(self):
if self._mongo_port:
@ -1436,6 +1444,16 @@ class ClickHouseCluster:
def setup_azurite_cmd(self, instance, env_variables, docker_compose_yml_dir):
self.with_azurite = True
env_variables["AZURITE_PORT"] = str(self.azurite_port)
env_variables[
"AZURITE_STORAGE_ACCOUNT_URL"
] = f"http://azurite1:{env_variables['AZURITE_PORT']}/devstoreaccount1"
env_variables["AZURITE_CONNECTION_STRING"] = (
f"DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;"
f"AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;"
f"BlobEndpoint={env_variables['AZURITE_STORAGE_ACCOUNT_URL']};"
)
self.base_cmd.extend(
["--file", p.join(docker_compose_yml_dir, "docker_compose_azurite.yml")]
)
@ -2524,7 +2542,11 @@ class ClickHouseCluster:
def wait_azurite_to_start(self, timeout=180):
from azure.storage.blob import BlobServiceClient
connection_string = "DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1;"
connection_string = (
f"DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;"
f"AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;"
f"BlobEndpoint=http://127.0.0.1:{self.env_variables['AZURITE_PORT']}/devstoreaccount1;"
)
time.sleep(1)
start = time.time()
while time.time() - start < timeout:

View File

@ -0,0 +1,27 @@
<clickhouse>
<remote_servers>
<test_cluster>
<shard>
<replica>
<host>node1</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>node2</host>
<port>9000</port>
</replica>
</shard>
</test_cluster>
</remote_servers>
<macros>
<cluster>test_cluster</cluster>
</macros>
<merge_tree>
<allow_remote_fs_zero_copy_replication>true</allow_remote_fs_zero_copy_replication>
<ratio_of_defaults_for_sparse_serialization>1.0</ratio_of_defaults_for_sparse_serialization>
</merge_tree>
</clickhouse>

View File

@ -1,50 +0,0 @@
<clickhouse>
<storage_configuration>
<disks>
<blob_storage_disk>
<type>azure_blob_storage</type>
<storage_account_url>http://azurite1:10000/devstoreaccount1</storage_account_url>
<container_name>cont</container_name>
<skip_access_check>false</skip_access_check>
<!-- default credentials for Azurite storage account -->
<account_name>devstoreaccount1</account_name>
<account_key>Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==</account_key>
</blob_storage_disk>
</disks>
<policies>
<blob_storage_policy>
<volumes>
<main>
<disk>blob_storage_disk</disk>
</main>
</volumes>
</blob_storage_policy>
</policies>
</storage_configuration>
<remote_servers>
<test_cluster>
<shard>
<replica>
<host>node1</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>node2</host>
<port>9000</port>
</replica>
</shard>
</test_cluster>
</remote_servers>
<macros>
<cluster>test_cluster</cluster>
</macros>
<merge_tree>
<allow_remote_fs_zero_copy_replication>true</allow_remote_fs_zero_copy_replication>
<ratio_of_defaults_for_sparse_serialization>1.0</ratio_of_defaults_for_sparse_serialization>
</merge_tree>
</clickhouse>

View File

@ -1,6 +1,8 @@
import logging
import pytest
from helpers.cluster import ClickHouseCluster
from test_storage_azure_blob_storage.test import azure_query
import os
logging.getLogger().setLevel(logging.INFO)
@ -15,20 +17,65 @@ CLUSTER_NAME = "test_cluster"
drop_table_statement = f"DROP TABLE {TABLE_NAME} ON CLUSTER {CLUSTER_NAME} SYNC"
def generate_cluster_def(port):
path = os.path.join(
os.path.dirname(os.path.realpath(__file__)),
"./_gen/storage_conf.xml",
)
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, "w") as f:
f.write(
f"""<clickhouse>
<storage_configuration>
<disks>
<blob_storage_disk>
<type>azure_blob_storage</type>
<storage_account_url>http://azurite1:{port}/devstoreaccount1</storage_account_url>
<container_name>cont</container_name>
<skip_access_check>false</skip_access_check>
<!-- default credentials for Azurite storage account -->
<account_name>devstoreaccount1</account_name>
<account_key>Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==</account_key>
</blob_storage_disk>
</disks>
<policies>
<blob_storage_policy>
<volumes>
<main>
<disk>blob_storage_disk</disk>
</main>
</volumes>
</blob_storage_policy>
</policies>
</storage_configuration>
</clickhouse>
"""
)
return path
@pytest.fixture(scope="module")
def cluster():
try:
cluster = ClickHouseCluster(__file__)
port = cluster.azurite_port
path = generate_cluster_def(port)
cluster.add_instance(
NODE1,
main_configs=["configs/config.d/storage_conf.xml"],
main_configs=[
"configs/config.d/config.xml",
path,
],
macros={"replica": "1"},
with_azurite=True,
with_zookeeper=True,
)
cluster.add_instance(
NODE2,
main_configs=["configs/config.d/storage_conf.xml"],
main_configs=[
"configs/config.d/config.xml",
path,
],
macros={"replica": "2"},
with_azurite=True,
with_zookeeper=True,
@ -57,7 +104,7 @@ def create_table(node, table_name, replica, **additional_settings):
ORDER BY id
SETTINGS {",".join((k+"="+repr(v) for k, v in settings.items()))}"""
node.query(create_table_statement)
azure_query(node, create_table_statement)
assert node.query(f"SELECT COUNT(*) FROM {table_name} FORMAT Values") == "(0)"
@ -80,27 +127,29 @@ def test_zero_copy_replication(cluster):
values1 = "(0,'data'),(1,'data')"
values2 = "(2,'data'),(3,'data')"
node1.query(f"INSERT INTO {TABLE_NAME} VALUES {values1}")
azure_query(node1, f"INSERT INTO {TABLE_NAME} VALUES {values1}")
node2.query(f"SYSTEM SYNC REPLICA {TABLE_NAME}")
assert (
node1.query(f"SELECT * FROM {TABLE_NAME} order by id FORMAT Values") == values1
azure_query(node1, f"SELECT * FROM {TABLE_NAME} order by id FORMAT Values")
== values1
)
assert (
node2.query(f"SELECT * FROM {TABLE_NAME} order by id FORMAT Values") == values1
azure_query(node2, f"SELECT * FROM {TABLE_NAME} order by id FORMAT Values")
== values1
)
# Based on version 21.x - should be only one file with size 100+ (checksums.txt), used by both nodes
assert get_large_objects_count(blob_container_client) == 1
node2.query(f"INSERT INTO {TABLE_NAME} VALUES {values2}")
azure_query(node2, f"INSERT INTO {TABLE_NAME} VALUES {values2}")
node1.query(f"SYSTEM SYNC REPLICA {TABLE_NAME}")
assert (
node2.query(f"SELECT * FROM {TABLE_NAME} order by id FORMAT Values")
azure_query(node2, f"SELECT * FROM {TABLE_NAME} order by id FORMAT Values")
== values1 + "," + values2
)
assert (
node1.query(f"SELECT * FROM {TABLE_NAME} order by id FORMAT Values")
azure_query(node1, f"SELECT * FROM {TABLE_NAME} order by id FORMAT Values")
== values1 + "," + values2
)

View File

@ -1,38 +0,0 @@
<clickhouse>
<storage_configuration>
<disks>
<blob_storage_disk>
<type>azure_blob_storage</type>
<storage_account_url>http://azurite1:10000/devstoreaccount1</storage_account_url>
<container_name>cont</container_name>
<skip_access_check>false</skip_access_check>
<!-- default credentials for Azurite storage account -->
<account_name>devstoreaccount1</account_name>
<account_key>Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==</account_key>
<max_single_part_upload_size>100000</max_single_part_upload_size>
<max_single_download_retries>10</max_single_download_retries>
<max_single_read_retries>10</max_single_read_retries>
<!-- NOTE: container_already_exists is omitted to:
a) create it
b) ignore if it already exists, since there are two instances, that conflicts with each other
-->
</blob_storage_disk>
<hdd>
<type>local</type>
<path>/</path>
</hdd>
</disks>
<policies>
<blob_storage_policy>
<volumes>
<main>
<disk>blob_storage_disk</disk>
</main>
<external>
<disk>hdd</disk>
</external>
</volumes>
</blob_storage_policy>
</policies>
</storage_configuration>
</clickhouse>

View File

@ -18,15 +18,63 @@ LOCAL_DISK = "hdd"
CONTAINER_NAME = "cont"
def generate_cluster_def(port):
path = os.path.join(
os.path.dirname(os.path.realpath(__file__)),
"./_gen/disk_storage_conf.xml",
)
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, "w") as f:
f.write(
f"""<clickhouse>
<storage_configuration>
<disks>
<blob_storage_disk>
<type>azure_blob_storage</type>
<storage_account_url>http://azurite1:{port}/devstoreaccount1</storage_account_url>
<container_name>cont</container_name>
<skip_access_check>false</skip_access_check>
<account_name>devstoreaccount1</account_name>
<account_key>Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==</account_key>
<max_single_part_upload_size>100000</max_single_part_upload_size>
<max_single_download_retries>10</max_single_download_retries>
<max_single_read_retries>10</max_single_read_retries>
</blob_storage_disk>
<hdd>
<type>local</type>
<path>/</path>
</hdd>
</disks>
<policies>
<blob_storage_policy>
<volumes>
<main>
<disk>blob_storage_disk</disk>
</main>
<external>
<disk>hdd</disk>
</external>
</volumes>
</blob_storage_policy>
</policies>
</storage_configuration>
</clickhouse>
"""
)
return path
@pytest.fixture(scope="module")
def cluster():
try:
cluster = ClickHouseCluster(__file__)
port = cluster.azurite_port
path = generate_cluster_def(port)
cluster.add_instance(
NODE_NAME,
main_configs=[
"configs/config.d/storage_conf.xml",
"configs/config.d/bg_processing_pool_conf.xml",
path,
],
with_azurite=True,
)
@ -490,9 +538,7 @@ def test_apply_new_settings(cluster):
create_table(node, TABLE_NAME)
config_path = os.path.join(
SCRIPT_DIR,
"./{}/node/configs/config.d/storage_conf.xml".format(
cluster.instances_dir_name
),
"./_gen/disk_storage_conf.xml".format(cluster.instances_dir_name),
)
azure_query(

View File

@ -1,14 +1,12 @@
<clickhouse>
<named_collections>
<azure_conf1>
<connection_string>DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://azurite1:10000/devstoreaccount1;</connection_string>
<container>cont</container>
<blob_path>test_simple_write_named.csv</blob_path>
<structure>key UInt64, data String</structure>
<format>CSV</format>
</azure_conf1>
<azure_conf2>
<storage_account_url>http://azurite1:10000/devstoreaccount1</storage_account_url>
<account_name>devstoreaccount1</account_name>
<account_key>Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==</account_key>
</azure_conf2>

View File

@ -29,7 +29,6 @@ def cluster():
with_azurite=True,
)
cluster.start()
yield cluster
finally:
cluster.shutdown()
@ -69,19 +68,29 @@ def azure_query(
continue
def get_azure_file_content(filename):
def get_azure_file_content(filename, port):
container_name = "cont"
connection_string = "DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1;"
blob_service_client = BlobServiceClient.from_connection_string(connection_string)
connection_string = (
f"DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;"
f"AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;"
f"BlobEndpoint=http://127.0.0.1:{port}/devstoreaccount1;"
)
blob_service_client = BlobServiceClient.from_connection_string(
str(connection_string)
)
container_client = blob_service_client.get_container_client(container_name)
blob_client = container_client.get_blob_client(filename)
download_stream = blob_client.download_blob()
return download_stream.readall().decode("utf-8")
def put_azure_file_content(filename, data):
def put_azure_file_content(filename, port, data):
container_name = "cont"
connection_string = "DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1;"
connection_string = (
f"DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;"
f"AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;"
f"BlobEndpoint=http://127.0.0.1:{port}/devstoreaccount1;"
)
blob_service_client = BlobServiceClient.from_connection_string(connection_string)
try:
container_client = blob_service_client.create_container(container_name)
@ -94,8 +103,13 @@ def put_azure_file_content(filename, data):
@pytest.fixture(autouse=True, scope="function")
def delete_all_files():
connection_string = "DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1;"
def delete_all_files(cluster):
port = cluster.env_variables["AZURITE_PORT"]
connection_string = (
f"DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;"
f"AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;"
f"BlobEndpoint=http://127.0.0.1:{port}/devstoreaccount1;"
)
blob_service_client = BlobServiceClient.from_connection_string(connection_string)
containers = blob_service_client.list_containers()
for container in containers:
@ -115,7 +129,8 @@ def test_create_table_connection_string(cluster):
node = cluster.instances["node"]
azure_query(
node,
"CREATE TABLE test_create_table_conn_string (key UInt64, data String) Engine = AzureBlobStorage('DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://azurite1:10000/devstoreaccount1/;', 'cont', 'test_create_connection_string', 'CSV')",
f"CREATE TABLE test_create_table_conn_string (key UInt64, data String) Engine = AzureBlobStorage('{cluster.env_variables['AZURITE_CONNECTION_STRING']}',"
f"'cont', 'test_create_connection_string', 'CSV')",
)
@ -123,57 +138,67 @@ def test_create_table_account_string(cluster):
node = cluster.instances["node"]
azure_query(
node,
"CREATE TABLE test_create_table_account_url (key UInt64, data String) Engine = AzureBlobStorage('http://azurite1:10000/devstoreaccount1', 'cont', 'test_create_connection_string', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV')",
f"CREATE TABLE test_create_table_account_url (key UInt64, data String) Engine = AzureBlobStorage('{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}',"
f"'cont', 'test_create_connection_string', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV')",
)
def test_simple_write_account_string(cluster):
node = cluster.instances["node"]
port = cluster.env_variables["AZURITE_PORT"]
azure_query(
node,
"CREATE TABLE test_simple_write (key UInt64, data String) Engine = AzureBlobStorage('http://azurite1:10000/devstoreaccount1', 'cont', 'test_simple_write.csv', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV')",
f"CREATE TABLE test_simple_write (key UInt64, data String) Engine = AzureBlobStorage('{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}',"
f" 'cont', 'test_simple_write.csv', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV')",
)
azure_query(node, "INSERT INTO test_simple_write VALUES (1, 'a')")
print(get_azure_file_content("test_simple_write.csv"))
assert get_azure_file_content("test_simple_write.csv") == '1,"a"\n'
print(get_azure_file_content("test_simple_write.csv", port))
assert get_azure_file_content("test_simple_write.csv", port) == '1,"a"\n'
def test_simple_write_connection_string(cluster):
node = cluster.instances["node"]
port = cluster.env_variables["AZURITE_PORT"]
azure_query(
node,
"CREATE TABLE test_simple_write_connection_string (key UInt64, data String) Engine = AzureBlobStorage('DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://azurite1:10000/devstoreaccount1;', 'cont', 'test_simple_write_c.csv', 'CSV')",
f"CREATE TABLE test_simple_write_connection_string (key UInt64, data String) Engine = AzureBlobStorage('{cluster.env_variables['AZURITE_CONNECTION_STRING']}', "
f"'cont', 'test_simple_write_c.csv', 'CSV')",
)
azure_query(node, "INSERT INTO test_simple_write_connection_string VALUES (1, 'a')")
print(get_azure_file_content("test_simple_write_c.csv"))
assert get_azure_file_content("test_simple_write_c.csv") == '1,"a"\n'
print(get_azure_file_content("test_simple_write_c.csv", port))
assert get_azure_file_content("test_simple_write_c.csv", port) == '1,"a"\n'
def test_simple_write_named_collection_1(cluster):
node = cluster.instances["node"]
port = cluster.env_variables["AZURITE_PORT"]
azure_query(
node,
"CREATE TABLE test_simple_write_named_collection_1 (key UInt64, data String) Engine = AzureBlobStorage(azure_conf1)",
f"CREATE TABLE test_simple_write_named_collection_1 (key UInt64, data String) Engine = AzureBlobStorage(azure_conf1, "
f"connection_string = '{cluster.env_variables['AZURITE_CONNECTION_STRING']}')",
)
azure_query(
node, "INSERT INTO test_simple_write_named_collection_1 VALUES (1, 'a')"
)
print(get_azure_file_content("test_simple_write_named.csv"))
assert get_azure_file_content("test_simple_write_named.csv") == '1,"a"\n'
print(get_azure_file_content("test_simple_write_named.csv", port))
assert get_azure_file_content("test_simple_write_named.csv", port) == '1,"a"\n'
azure_query(node, "TRUNCATE TABLE test_simple_write_named_collection_1")
def test_simple_write_named_collection_2(cluster):
node = cluster.instances["node"]
port = cluster.env_variables["AZURITE_PORT"]
azure_query(
node,
"CREATE TABLE test_simple_write_named_collection_2 (key UInt64, data String) Engine = AzureBlobStorage(azure_conf2, container='cont', blob_path='test_simple_write_named_2.csv', format='CSV')",
f"CREATE TABLE test_simple_write_named_collection_2 (key UInt64, data String) Engine = AzureBlobStorage(azure_conf2, "
f"storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', "
f"container='cont', blob_path='test_simple_write_named_2.csv', format='CSV')",
)
azure_query(
node, "INSERT INTO test_simple_write_named_collection_2 VALUES (1, 'a')"
)
print(get_azure_file_content("test_simple_write_named_2.csv"))
assert get_azure_file_content("test_simple_write_named_2.csv") == '1,"a"\n'
print(get_azure_file_content("test_simple_write_named_2.csv", port))
assert get_azure_file_content("test_simple_write_named_2.csv", port) == '1,"a"\n'
def test_partition_by(cluster):
@ -182,16 +207,19 @@ def test_partition_by(cluster):
partition_by = "column3"
values = "(1, 2, 3), (3, 2, 1), (78, 43, 45)"
filename = "test_{_partition_id}.csv"
port = cluster.env_variables["AZURITE_PORT"]
azure_query(
node,
f"CREATE TABLE test_partitioned_write ({table_format}) Engine = AzureBlobStorage('http://azurite1:10000/devstoreaccount1', 'cont', '{filename}', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV') PARTITION BY {partition_by}",
f"CREATE TABLE test_partitioned_write ({table_format}) Engine = AzureBlobStorage('{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}',"
f" 'cont', '{filename}', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV') "
f"PARTITION BY {partition_by}",
)
azure_query(node, f"INSERT INTO test_partitioned_write VALUES {values}")
assert "1,2,3\n" == get_azure_file_content("test_3.csv")
assert "3,2,1\n" == get_azure_file_content("test_1.csv")
assert "78,43,45\n" == get_azure_file_content("test_45.csv")
assert "1,2,3\n" == get_azure_file_content("test_3.csv", port)
assert "3,2,1\n" == get_azure_file_content("test_1.csv", port)
assert "78,43,45\n" == get_azure_file_content("test_45.csv", port)
def test_partition_by_string_column(cluster):
@ -200,15 +228,18 @@ def test_partition_by_string_column(cluster):
partition_by = "col_str"
values = "(1, 'foo/bar'), (3, 'йцук'), (78, '你好')"
filename = "test_{_partition_id}.csv"
port = cluster.env_variables["AZURITE_PORT"]
azure_query(
node,
f"CREATE TABLE test_partitioned_string_write ({table_format}) Engine = AzureBlobStorage('http://azurite1:10000/devstoreaccount1', 'cont', '{filename}', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV') PARTITION BY {partition_by}",
f"CREATE TABLE test_partitioned_string_write ({table_format}) Engine = AzureBlobStorage('{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}',"
f" 'cont', '{filename}', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV') "
f"PARTITION BY {partition_by}",
)
azure_query(node, f"INSERT INTO test_partitioned_string_write VALUES {values}")
assert '1,"foo/bar"\n' == get_azure_file_content("test_foo/bar.csv")
assert '3,"йцук"\n' == get_azure_file_content("test_йцук.csv")
assert '78,"你好"\n' == get_azure_file_content("test_你好.csv")
assert '1,"foo/bar"\n' == get_azure_file_content("test_foo/bar.csv", port)
assert '3,"йцук"\n' == get_azure_file_content("test_йцук.csv", port)
assert '78,"你好"\n' == get_azure_file_content("test_你好.csv", port)
def test_partition_by_const_column(cluster):
@ -218,46 +249,54 @@ def test_partition_by_const_column(cluster):
partition_by = "'88'"
values_csv = "1,2,3\n3,2,1\n78,43,45\n"
filename = "test_{_partition_id}.csv"
port = cluster.env_variables["AZURITE_PORT"]
azure_query(
node,
f"CREATE TABLE test_partitioned_const_write ({table_format}) Engine = AzureBlobStorage('http://azurite1:10000/devstoreaccount1', 'cont', '{filename}', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV') PARTITION BY {partition_by}",
f"CREATE TABLE test_partitioned_const_write ({table_format}) Engine = AzureBlobStorage('{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}',"
f" 'cont', '{filename}', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV')"
f" PARTITION BY {partition_by}",
)
azure_query(node, f"INSERT INTO test_partitioned_const_write VALUES {values}")
assert values_csv == get_azure_file_content("test_88.csv")
assert values_csv == get_azure_file_content("test_88.csv", port)
def test_truncate(cluster):
node = cluster.instances["node"]
port = cluster.env_variables["AZURITE_PORT"]
azure_query(
node,
"CREATE TABLE test_truncate (key UInt64, data String) Engine = AzureBlobStorage(azure_conf2, container='cont', blob_path='test_truncate.csv', format='CSV')",
f"CREATE TABLE test_truncate (key UInt64, data String) Engine = AzureBlobStorage(azure_conf2, "
f"storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', container='cont', blob_path='test_truncate.csv', format='CSV')",
)
azure_query(node, "INSERT INTO test_truncate VALUES (1, 'a')")
assert get_azure_file_content("test_truncate.csv") == '1,"a"\n'
assert get_azure_file_content("test_truncate.csv", port) == '1,"a"\n'
azure_query(node, "TRUNCATE TABLE test_truncate")
with pytest.raises(Exception):
print(get_azure_file_content("test_truncate.csv"))
print(get_azure_file_content("test_truncate.csv", port))
def test_simple_read_write(cluster):
node = cluster.instances["node"]
port = cluster.env_variables["AZURITE_PORT"]
azure_query(
node,
"CREATE TABLE test_simple_read_write (key UInt64, data String) Engine = AzureBlobStorage(azure_conf2, container='cont', blob_path='test_simple_read_write.csv', format='CSV')",
f"CREATE TABLE test_simple_read_write (key UInt64, data String) Engine = AzureBlobStorage(azure_conf2, "
f"storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', container='cont', blob_path='test_simple_read_write.csv', "
f"format='CSV')",
)
azure_query(node, "INSERT INTO test_simple_read_write VALUES (1, 'a')")
assert get_azure_file_content("test_simple_read_write.csv") == '1,"a"\n'
assert get_azure_file_content("test_simple_read_write.csv", port) == '1,"a"\n'
print(azure_query(node, "SELECT * FROM test_simple_read_write"))
assert azure_query(node, "SELECT * FROM test_simple_read_write") == "1\ta\n"
def test_create_new_files_on_insert(cluster):
node = cluster.instances["node"]
azure_query(
node,
f"create table test_multiple_inserts(a Int32, b String) ENGINE = AzureBlobStorage(azure_conf2, container='cont', blob_path='test_parquet', format='Parquet')",
f"create table test_multiple_inserts(a Int32, b String) ENGINE = AzureBlobStorage(azure_conf2, "
f"storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', container='cont', blob_path='test_parquet', format='Parquet')",
)
azure_query(node, "truncate table test_multiple_inserts")
azure_query(
@ -281,10 +320,10 @@ def test_create_new_files_on_insert(cluster):
def test_overwrite(cluster):
node = cluster.instances["node"]
azure_query(
node,
f"create table test_overwrite(a Int32, b String) ENGINE = AzureBlobStorage(azure_conf2, container='cont', blob_path='test_parquet_overwrite', format='Parquet')",
f"create table test_overwrite(a Int32, b String) ENGINE = AzureBlobStorage(azure_conf2, "
f"storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', container='cont', blob_path='test_parquet_overwrite', format='Parquet')",
)
azure_query(node, "truncate table test_overwrite")
@ -308,7 +347,8 @@ def test_insert_with_path_with_globs(cluster):
node = cluster.instances["node"]
azure_query(
node,
f"create table test_insert_globs(a Int32, b String) ENGINE = AzureBlobStorage(azure_conf2, container='cont', blob_path='test_insert_with_globs*', format='Parquet')",
f"create table test_insert_globs(a Int32, b String) ENGINE = AzureBlobStorage(azure_conf2, "
f"storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', container='cont', blob_path='test_insert_with_globs*', format='Parquet')",
)
node.query_and_get_error(
f"insert into table function test_insert_globs SELECT number, randomString(100) FROM numbers(500)"
@ -331,7 +371,8 @@ def test_put_get_with_globs(cluster):
azure_query(
node,
f"CREATE TABLE test_put_{i}_{j} ({table_format}) Engine = AzureBlobStorage(azure_conf2, container='cont', blob_path='{path}', format='CSV')",
f"CREATE TABLE test_put_{i}_{j} ({table_format}) Engine = AzureBlobStorage(azure_conf2, "
f"storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', container='cont', blob_path='{path}', format='CSV')",
)
query = f"insert into test_put_{i}_{j} VALUES {values}"
@ -339,7 +380,8 @@ def test_put_get_with_globs(cluster):
azure_query(
node,
f"CREATE TABLE test_glob_select ({table_format}) Engine = AzureBlobStorage(azure_conf2, container='cont', blob_path='{unique_prefix}/*_{{a,b,c,d}}/?.csv', format='CSV')",
f"CREATE TABLE test_glob_select ({table_format}) Engine = AzureBlobStorage(azure_conf2, "
f"storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', container='cont', blob_path='{unique_prefix}/*_{{a,b,c,d}}/?.csv', format='CSV')",
)
query = "select sum(column1), sum(column2), sum(column3), min(_file), max(_path) from test_glob_select"
assert azure_query(node, query).splitlines() == [
@ -363,7 +405,8 @@ def test_azure_glob_scheherazade(cluster):
unique_num = random.randint(1, 10000)
azure_query(
node,
f"CREATE TABLE test_scheherazade_{i}_{unique_num} ({table_format}) Engine = AzureBlobStorage(azure_conf2, container='cont', blob_path='{path}', format='CSV')",
f"CREATE TABLE test_scheherazade_{i}_{unique_num} ({table_format}) Engine = AzureBlobStorage(azure_conf2, "
f"storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', container='cont', blob_path='{path}', format='CSV')",
)
query = (
f"insert into test_scheherazade_{i}_{unique_num} VALUES {values}"
@ -382,7 +425,8 @@ def test_azure_glob_scheherazade(cluster):
azure_query(
node,
f"CREATE TABLE test_glob_select_scheherazade ({table_format}) Engine = AzureBlobStorage(azure_conf2, container='cont', blob_path='night_*/tale.csv', format='CSV')",
f"CREATE TABLE test_glob_select_scheherazade ({table_format}) Engine = AzureBlobStorage(azure_conf2, "
f"storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', container='cont', blob_path='night_*/tale.csv', format='CSV')",
)
query = "select count(), sum(column1), sum(column2), sum(column3) from test_glob_select_scheherazade"
assert azure_query(node, query).splitlines() == ["1001\t1001\t1001\t1001"]
@ -394,6 +438,7 @@ def test_azure_glob_scheherazade(cluster):
)
def test_storage_azure_get_gzip(cluster, extension, method):
node = cluster.instances["node"]
port = cluster.env_variables["AZURITE_PORT"]
filename = f"test_get_gzip.{extension}"
name = f"test_get_gzip_{extension}"
data = [
@ -420,14 +465,13 @@ def test_storage_azure_get_gzip(cluster, extension, method):
compressed = gzip.GzipFile(fileobj=buf, mode="wb")
compressed.write(("\n".join(data)).encode())
compressed.close()
put_azure_file_content(filename, buf.getvalue())
put_azure_file_content(filename, port, buf.getvalue())
azure_query(
node,
f"""CREATE TABLE {name} (name String, id UInt32) ENGINE = AzureBlobStorage(
azure_conf2, container='cont', blob_path ='{filename}',
format='CSV',
compression='{method}')""",
f"CREATE TABLE {name} (name String, id UInt32) ENGINE = AzureBlobStorage( azure_conf2,"
f" storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', container='cont', blob_path ='{filename}',"
f"format='CSV', compression='{method}')",
)
assert azure_query(node, f"SELECT sum(id) FROM {name}").splitlines() == ["565"]
@ -439,7 +483,9 @@ def test_schema_inference_no_globs(cluster):
table_format = "column1 UInt32, column2 String, column3 UInt32"
azure_query(
node,
f"CREATE TABLE test_schema_inference_src ({table_format}) Engine = AzureBlobStorage(azure_conf2, container='cont', blob_path='test_schema_inference_no_globs.csv', format='CSVWithNames')",
f"CREATE TABLE test_schema_inference_src ({table_format}) Engine = AzureBlobStorage(azure_conf2, "
f"storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', container='cont', "
f"blob_path='test_schema_inference_no_globs.csv', format='CSVWithNames')",
)
query = f"insert into test_schema_inference_src SELECT number, toString(number), number * number FROM numbers(1000)"
@ -447,7 +493,8 @@ def test_schema_inference_no_globs(cluster):
azure_query(
node,
f"CREATE TABLE test_select_inference Engine = AzureBlobStorage(azure_conf2, container='cont', blob_path='test_schema_inference_no_globs.csv')",
f"CREATE TABLE test_select_inference Engine = AzureBlobStorage(azure_conf2, "
f"storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', container='cont', blob_path='test_schema_inference_no_globs.csv')",
)
print(node.query("SHOW CREATE TABLE test_select_inference"))
@ -474,7 +521,9 @@ def test_schema_inference_from_globs(cluster):
azure_query(
node,
f"CREATE TABLE test_schema_{i}_{j} ({table_format}) Engine = AzureBlobStorage(azure_conf2, container='cont', blob_path='{path}', format='CSVWithNames')",
f"CREATE TABLE test_schema_{i}_{j} ({table_format}) Engine = AzureBlobStorage(azure_conf2, "
f"storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', container='cont', "
f"blob_path='{path}', format='CSVWithNames')",
)
query = f"insert into test_schema_{i}_{j} VALUES {values}"
@ -482,7 +531,8 @@ def test_schema_inference_from_globs(cluster):
azure_query(
node,
f"CREATE TABLE test_glob_select_inference Engine = AzureBlobStorage(azure_conf2, container='cont', blob_path='{unique_prefix}/*_{{a,b,c,d}}/?.csv')",
f"CREATE TABLE test_glob_select_inference Engine = AzureBlobStorage(azure_conf2, "
f"storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', container='cont', blob_path='{unique_prefix}/*_{{a,b,c,d}}/?.csv')",
)
print(node.query("SHOW CREATE TABLE test_glob_select_inference"))
@ -497,36 +547,47 @@ def test_schema_inference_from_globs(cluster):
def test_simple_write_account_string_table_function(cluster):
node = cluster.instances["node"]
port = cluster.env_variables["AZURITE_PORT"]
azure_query(
node,
"INSERT INTO TABLE FUNCTION azureBlobStorage('http://azurite1:10000/devstoreaccount1', 'cont', 'test_simple_write_tf.csv', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV', 'auto', 'key UInt64, data String') VALUES (1, 'a')",
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', "
f"'cont', 'test_simple_write_tf.csv', 'devstoreaccount1', "
f"'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV', 'auto', 'key UInt64, data String')"
f" VALUES (1, 'a')",
)
print(get_azure_file_content("test_simple_write_tf.csv"))
assert get_azure_file_content("test_simple_write_tf.csv") == '1,"a"\n'
print(get_azure_file_content("test_simple_write_tf.csv", port))
assert get_azure_file_content("test_simple_write_tf.csv", port) == '1,"a"\n'
def test_simple_write_connection_string_table_function(cluster):
node = cluster.instances["node"]
port = cluster.env_variables["AZURITE_PORT"]
azure_query(
node,
"INSERT INTO TABLE FUNCTION azureBlobStorage('DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://azurite1:10000/devstoreaccount1;', 'cont', 'test_simple_write_connection_tf.csv', 'CSV', 'auto', 'key UInt64, data String') VALUES (1, 'a')",
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{cluster.env_variables['AZURITE_CONNECTION_STRING']}', "
f"'cont', 'test_simple_write_connection_tf.csv', 'CSV', 'auto', 'key UInt64, data String') VALUES (1, 'a')",
)
print(get_azure_file_content("test_simple_write_connection_tf.csv", port))
assert (
get_azure_file_content("test_simple_write_connection_tf.csv", port) == '1,"a"\n'
)
print(get_azure_file_content("test_simple_write_connection_tf.csv"))
assert get_azure_file_content("test_simple_write_connection_tf.csv") == '1,"a"\n'
def test_simple_write_named_collection_1_table_function(cluster):
node = cluster.instances["node"]
port = cluster.env_variables["AZURITE_PORT"]
azure_query(
node,
"INSERT INTO TABLE FUNCTION azureBlobStorage(azure_conf1) VALUES (1, 'a')",
f"INSERT INTO TABLE FUNCTION azureBlobStorage(azure_conf1, "
f"connection_string = '{cluster.env_variables['AZURITE_CONNECTION_STRING']}') VALUES (1, 'a')",
)
print(get_azure_file_content("test_simple_write_named.csv"))
assert get_azure_file_content("test_simple_write_named.csv") == '1,"a"\n'
print(get_azure_file_content("test_simple_write_named.csv", port))
assert get_azure_file_content("test_simple_write_named.csv", port) == '1,"a"\n'
azure_query(
node,
"CREATE TABLE drop_table (key UInt64, data String) Engine = AzureBlobStorage(azure_conf1)",
f"CREATE TABLE drop_table (key UInt64, data String) Engine = AzureBlobStorage(azure_conf1, "
f"connection_string = '{cluster.env_variables['AZURITE_CONNECTION_STRING']};')",
)
azure_query(
@ -537,13 +598,14 @@ def test_simple_write_named_collection_1_table_function(cluster):
def test_simple_write_named_collection_2_table_function(cluster):
node = cluster.instances["node"]
port = cluster.env_variables["AZURITE_PORT"]
azure_query(
node,
"INSERT INTO TABLE FUNCTION azureBlobStorage(azure_conf2, container='cont', blob_path='test_simple_write_named_2_tf.csv', format='CSV', structure='key UInt64, data String') VALUES (1, 'a')",
f"INSERT INTO TABLE FUNCTION azureBlobStorage(azure_conf2, storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}',"
f" container='cont', blob_path='test_simple_write_named_2_tf.csv', format='CSV', structure='key UInt64, data String') VALUES (1, 'a')",
)
print(get_azure_file_content("test_simple_write_named_2_tf.csv"))
assert get_azure_file_content("test_simple_write_named_2_tf.csv") == '1,"a"\n'
print(get_azure_file_content("test_simple_write_named_2_tf.csv", port))
assert get_azure_file_content("test_simple_write_named_2_tf.csv", port) == '1,"a"\n'
def test_put_get_with_globs_tf(cluster):
@ -562,9 +624,14 @@ def test_put_get_with_globs_tf(cluster):
azure_query(
node,
f"INSERT INTO TABLE FUNCTION azureBlobStorage(azure_conf2, container='cont', blob_path='{path}', format='CSV', compression='auto', structure='{table_format}') VALUES {values}",
f"INSERT INTO TABLE FUNCTION azureBlobStorage(azure_conf2, storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}',"
f" container='cont', blob_path='{path}', format='CSV', compression='auto', structure='{table_format}') VALUES {values}",
)
query = f"select sum(column1), sum(column2), sum(column3), min(_file), max(_path) from azureBlobStorage(azure_conf2, container='cont', blob_path='{unique_prefix}/*_{{a,b,c,d}}/?.csv', format='CSV', structure='{table_format}')"
query = (
f"select sum(column1), sum(column2), sum(column3), min(_file), max(_path) from azureBlobStorage(azure_conf2, "
f"storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', container='cont', "
f"blob_path='{unique_prefix}/*_{{a,b,c,d}}/?.csv', format='CSV', structure='{table_format}')"
)
assert azure_query(node, query).splitlines() == [
"450\t450\t900\t0.csv\t{bucket}/{max_path}".format(
bucket="cont", max_path=max_path
@ -576,10 +643,18 @@ def test_schema_inference_no_globs_tf(cluster):
node = cluster.instances["node"] # type: ClickHouseInstance
table_format = "column1 UInt32, column2 String, column3 UInt32"
query = f"insert into table function azureBlobStorage(azure_conf2, container='cont', blob_path='test_schema_inference_no_globs_tf.csv', format='CSVWithNames', structure='{table_format}') SELECT number, toString(number), number * number FROM numbers(1000)"
query = (
f"insert into table function azureBlobStorage(azure_conf2, storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', "
f"container='cont', blob_path='test_schema_inference_no_globs_tf.csv', format='CSVWithNames', structure='{table_format}') "
f"SELECT number, toString(number), number * number FROM numbers(1000)"
)
azure_query(node, query)
query = "select sum(column1), sum(length(column2)), sum(column3), min(_file), max(_path) from azureBlobStorage(azure_conf2, container='cont', blob_path='test_schema_inference_no_globs_tf.csv')"
query = (
f"select sum(column1), sum(length(column2)), sum(column3), min(_file), max(_path) from azureBlobStorage(azure_conf2, "
f"storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', container='cont', "
f"blob_path='test_schema_inference_no_globs_tf.csv')"
)
assert azure_query(node, query).splitlines() == [
"499500\t2890\t332833500\ttest_schema_inference_no_globs_tf.csv\tcont/test_schema_inference_no_globs_tf.csv"
]
@ -600,10 +675,17 @@ def test_schema_inference_from_globs_tf(cluster):
max_path = max(path, max_path)
values = f"({i},{j},{i + j})"
query = f"insert into table function azureBlobStorage(azure_conf2, container='cont', blob_path='{path}', format='CSVWithNames', structure='{table_format}') VALUES {values}"
query = (
f"insert into table function azureBlobStorage(azure_conf2, storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', "
f"container='cont', blob_path='{path}', format='CSVWithNames', structure='{table_format}') VALUES {values}"
)
azure_query(node, query)
query = f"select sum(column1), sum(column2), sum(column3), min(_file), max(_path) from azureBlobStorage(azure_conf2, container='cont', blob_path='{unique_prefix}/*_{{a,b,c,d}}/?.csv')"
query = (
f"select sum(column1), sum(column2), sum(column3), min(_file), max(_path) from azureBlobStorage(azure_conf2, "
f"storage_account_url = '{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', container='cont', "
f"blob_path='{unique_prefix}/*_{{a,b,c,d}}/?.csv')"
)
assert azure_query(node, query).splitlines() == [
"450\t450\t900\t0.csv\t{bucket}/{max_path}".format(
bucket="cont", max_path=max_path
@ -617,15 +699,18 @@ def test_partition_by_tf(cluster):
partition_by = "column3"
values = "(1, 2, 3), (3, 2, 1), (78, 43, 45)"
filename = "test_partition_tf_{_partition_id}.csv"
port = cluster.env_variables["AZURITE_PORT"]
azure_query(
node,
f"INSERT INTO TABLE FUNCTION azureBlobStorage('http://azurite1:10000/devstoreaccount1', 'cont', '{filename}', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV', 'auto', '{table_format}') PARTITION BY {partition_by} VALUES {values}",
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', "
f"'cont', '{filename}', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', "
f"'CSV', 'auto', '{table_format}') PARTITION BY {partition_by} VALUES {values}",
)
assert "1,2,3\n" == get_azure_file_content("test_partition_tf_3.csv")
assert "3,2,1\n" == get_azure_file_content("test_partition_tf_1.csv")
assert "78,43,45\n" == get_azure_file_content("test_partition_tf_45.csv")
assert "1,2,3\n" == get_azure_file_content("test_partition_tf_3.csv", port)
assert "3,2,1\n" == get_azure_file_content("test_partition_tf_1.csv", port)
assert "78,43,45\n" == get_azure_file_content("test_partition_tf_45.csv", port)
def test_filter_using_file(cluster):
@ -637,45 +722,64 @@ def test_filter_using_file(cluster):
azure_query(
node,
f"INSERT INTO TABLE FUNCTION azureBlobStorage('http://azurite1:10000/devstoreaccount1', 'cont', '{filename}', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV', 'auto', '{table_format}') PARTITION BY {partition_by} VALUES {values}",
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', 'cont', '{filename}', "
f"'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV', 'auto', "
f"'{table_format}') PARTITION BY {partition_by} VALUES {values}",
)
query = f"select count(*) from azureBlobStorage('http://azurite1:10000/devstoreaccount1', 'cont', 'test_partition_tf_*.csv', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV', 'auto', '{table_format}') WHERE _file='test_partition_tf_3.csv'"
query = (
f"select count(*) from azureBlobStorage('{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', 'cont', 'test_partition_tf_*.csv', "
f"'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV', 'auto', "
f"'{table_format}') WHERE _file='test_partition_tf_3.csv'"
)
assert azure_query(node, query) == "1\n"
def test_read_subcolumns(cluster):
node = cluster.instances["node"]
storage_account_url = cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]
azure_query(
node,
"INSERT INTO TABLE FUNCTION azureBlobStorage('http://azurite1:10000/devstoreaccount1', 'cont', 'test_subcolumns.tsv', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', 'auto', 'a Tuple(b Tuple(c UInt32, d UInt32), e UInt32)') select ((1, 2), 3)",
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}', 'cont', 'test_subcolumns.tsv', "
f"'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', 'auto',"
f" 'a Tuple(b Tuple(c UInt32, d UInt32), e UInt32)') select ((1, 2), 3)",
)
azure_query(
node,
"INSERT INTO TABLE FUNCTION azureBlobStorage('http://azurite1:10000/devstoreaccount1', 'cont', 'test_subcolumns.jsonl', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', 'auto', 'a Tuple(b Tuple(c UInt32, d UInt32), e UInt32)') select ((1, 2), 3)",
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}', 'cont', 'test_subcolumns.jsonl', "
f"'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', 'auto', "
f"'a Tuple(b Tuple(c UInt32, d UInt32), e UInt32)') select ((1, 2), 3)",
)
res = node.query(
f"select a.b.d, _path, a.b, _file, a.e from azureBlobStorage('http://azurite1:10000/devstoreaccount1', 'cont', 'test_subcolumns.tsv', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', 'auto', 'a Tuple(b Tuple(c UInt32, d UInt32), e UInt32)')"
f"select a.b.d, _path, a.b, _file, a.e from azureBlobStorage('{storage_account_url}', 'cont', 'test_subcolumns.tsv',"
f" 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', 'auto',"
f" 'a Tuple(b Tuple(c UInt32, d UInt32), e UInt32)')"
)
assert res == "2\tcont/test_subcolumns.tsv\t(1,2)\ttest_subcolumns.tsv\t3\n"
res = node.query(
f"select a.b.d, _path, a.b, _file, a.e from azureBlobStorage('http://azurite1:10000/devstoreaccount1', 'cont', 'test_subcolumns.jsonl', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', 'auto', 'a Tuple(b Tuple(c UInt32, d UInt32), e UInt32)')"
f"select a.b.d, _path, a.b, _file, a.e from azureBlobStorage('{storage_account_url}', 'cont', 'test_subcolumns.jsonl',"
f" 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', 'auto', "
f"'a Tuple(b Tuple(c UInt32, d UInt32), e UInt32)')"
)
assert res == "2\tcont/test_subcolumns.jsonl\t(1,2)\ttest_subcolumns.jsonl\t3\n"
res = node.query(
f"select x.b.d, _path, x.b, _file, x.e from azureBlobStorage('http://azurite1:10000/devstoreaccount1', 'cont', 'test_subcolumns.jsonl', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', 'auto', 'x Tuple(b Tuple(c UInt32, d UInt32), e UInt32)')"
f"select x.b.d, _path, x.b, _file, x.e from azureBlobStorage('{storage_account_url}', 'cont', 'test_subcolumns.jsonl',"
f" 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', 'auto', "
f"'x Tuple(b Tuple(c UInt32, d UInt32), e UInt32)')"
)
assert res == "0\tcont/test_subcolumns.jsonl\t(0,0)\ttest_subcolumns.jsonl\t0\n"
res = node.query(
f"select x.b.d, _path, x.b, _file, x.e from azureBlobStorage('http://azurite1:10000/devstoreaccount1', 'cont', 'test_subcolumns.jsonl', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', 'auto', 'x Tuple(b Tuple(c UInt32, d UInt32), e UInt32) default ((42, 42), 42)')"
f"select x.b.d, _path, x.b, _file, x.e from azureBlobStorage('{storage_account_url}', 'cont', 'test_subcolumns.jsonl',"
f" 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', 'auto', "
f"'x Tuple(b Tuple(c UInt32, d UInt32), e UInt32) default ((42, 42), 42)')"
)
assert res == "42\tcont/test_subcolumns.jsonl\t(42,42)\ttest_subcolumns.jsonl\t42\n"
@ -683,15 +787,18 @@ def test_read_subcolumns(cluster):
def test_read_from_not_existing_container(cluster):
node = cluster.instances["node"]
query = f"select * from azureBlobStorage('http://azurite1:10000/devstoreaccount1', 'cont_not_exists', 'test_table.csv', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV', 'auto')"
query = (
f"select * from azureBlobStorage('{cluster.env_variables['AZURITE_STORAGE_ACCOUNT_URL']}', 'cont_not_exists', 'test_table.csv', "
f"'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV', 'auto')"
)
expected_err_msg = "container does not exist"
assert expected_err_msg in azure_query(node, query, expect_error="true")
def test_function_signatures(cluster):
node = cluster.instances["node"]
connection_string = "DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://azurite1:10000/devstoreaccount1;"
storage_account_url = "http://azurite1:10000/devstoreaccount1"
connection_string = cluster.env_variables["AZURITE_CONNECTION_STRING"]
storage_account_url = cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]
account_name = "devstoreaccount1"
account_key = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="
azure_query(
@ -745,7 +852,8 @@ def check_profile_event_for_query(instance, file, profile_event, amount):
query_pattern = f"azureBlobStorage%{file}".replace("'", "\\'")
res = int(
instance.query(
f"select ProfileEvents['{profile_event}'] from system.query_log where query like '%{query_pattern}%' and query not like '%ProfileEvents%' and type = 'QueryFinish' order by query_start_time_microseconds desc limit 1"
f"select ProfileEvents['{profile_event}'] from system.query_log where query like '%{query_pattern}%' and query not like '%ProfileEvents%' "
f"and type = 'QueryFinish' order by query_start_time_microseconds desc limit 1"
)
)
@ -804,15 +912,16 @@ def check_cache(instance, expected_files):
def test_schema_inference_cache(cluster):
node = cluster.instances["node"]
connection_string = "DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://azurite1:10000/devstoreaccount1;"
storage_account_url = "http://azurite1:10000/devstoreaccount1"
connection_string = cluster.env_variables["AZURITE_CONNECTION_STRING"]
storage_account_url = cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]
account_name = "devstoreaccount1"
account_key = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="
node.query("system drop schema cache")
azure_query(
node,
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}', 'cont', 'test_cache0.jsonl', '{account_name}', '{account_key}') select * from numbers(100)",
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}', 'cont', 'test_cache0.jsonl', '{account_name}', '{account_key}') "
f"select * from numbers(100)",
)
time.sleep(1)
@ -826,7 +935,8 @@ def test_schema_inference_cache(cluster):
azure_query(
node,
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}', 'cont', 'test_cache0.jsonl', '{account_name}', '{account_key}') select * from numbers(100) settings azure_truncate_on_insert=1",
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}', 'cont', 'test_cache0.jsonl', '{account_name}', '{account_key}') "
f"select * from numbers(100) settings azure_truncate_on_insert=1",
)
time.sleep(1)
@ -836,7 +946,8 @@ def test_schema_inference_cache(cluster):
azure_query(
node,
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}', 'cont', 'test_cache1.jsonl', '{account_name}', '{account_key}') select * from numbers(100) settings azure_truncate_on_insert=1",
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}', 'cont', 'test_cache1.jsonl', '{account_name}', '{account_key}') "
f"select * from numbers(100) settings azure_truncate_on_insert=1",
)
time.sleep(1)
@ -849,7 +960,8 @@ def test_schema_inference_cache(cluster):
azure_query(
node,
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}', 'cont', 'test_cache2.jsonl', '{account_name}', '{account_key}') select * from numbers(100) settings azure_truncate_on_insert=1",
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}', 'cont', 'test_cache2.jsonl', '{account_name}', '{account_key}') "
f"select * from numbers(100) settings azure_truncate_on_insert=1",
)
time.sleep(1)
@ -895,7 +1007,8 @@ def test_schema_inference_cache(cluster):
azure_query(
node,
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}', 'cont', 'test_cache3.jsonl', '{account_name}', '{account_key}') select * from numbers(100) settings azure_truncate_on_insert=1",
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}', 'cont', 'test_cache3.jsonl', '{account_name}', '{account_key}') "
f"select * from numbers(100) settings azure_truncate_on_insert=1",
)
time.sleep(1)
@ -919,7 +1032,8 @@ def test_schema_inference_cache(cluster):
azure_query(
node,
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}', 'cont', 'test_cache0.csv', '{account_name}', '{account_key}') select * from numbers(100) settings azure_truncate_on_insert=1",
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}', 'cont', 'test_cache0.csv', '{account_name}', '{account_key}') "
f"select * from numbers(100) settings azure_truncate_on_insert=1",
)
time.sleep(1)
@ -943,7 +1057,8 @@ def test_schema_inference_cache(cluster):
azure_query(
node,
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}', 'cont', 'test_cache0.csv', '{account_name}', '{account_key}') select * from numbers(200) settings azure_truncate_on_insert=1",
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}', 'cont', 'test_cache0.csv', '{account_name}', '{account_key}') "
f"select * from numbers(200) settings azure_truncate_on_insert=1",
)
time.sleep(1)
@ -958,7 +1073,8 @@ def test_schema_inference_cache(cluster):
azure_query(
node,
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}', 'cont', 'test_cache1.csv', '{account_name}', '{account_key}') select * from numbers(100) settings azure_truncate_on_insert=1",
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}', 'cont', 'test_cache1.csv', '{account_name}', '{account_key}') "
f"select * from numbers(100) settings azure_truncate_on_insert=1",
)
time.sleep(1)
@ -991,7 +1107,8 @@ def test_schema_inference_cache(cluster):
azure_query(
node,
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}', 'cont', 'test_cache.parquet', '{account_name}', '{account_key}') select * from numbers(100) settings azure_truncate_on_insert=1",
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}', 'cont', 'test_cache.parquet', '{account_name}', '{account_key}') "
f"select * from numbers(100) settings azure_truncate_on_insert=1",
)
time.sleep(1)
@ -1007,23 +1124,29 @@ def test_schema_inference_cache(cluster):
def test_filtering_by_file_or_path(cluster):
node = cluster.instances["node"]
storage_account_url = cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]
azure_query(
node,
"INSERT INTO TABLE FUNCTION azureBlobStorage('http://azurite1:10000/devstoreaccount1', 'cont', 'test_filter1.tsv', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', 'auto', 'x UInt64') select 1",
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}','cont', 'test_filter1.tsv', 'devstoreaccount1', "
f"'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', 'auto', 'x UInt64') select 1",
)
azure_query(
node,
"INSERT INTO TABLE FUNCTION azureBlobStorage('http://azurite1:10000/devstoreaccount1', 'cont', 'test_filter2.tsv', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', 'auto', 'x UInt64') select 2",
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}','cont', 'test_filter2.tsv', 'devstoreaccount1', "
f"'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', 'auto', 'x UInt64') select 2",
)
azure_query(
node,
"INSERT INTO TABLE FUNCTION azureBlobStorage('http://azurite1:10000/devstoreaccount1', 'cont', 'test_filter3.tsv', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', 'auto', 'x UInt64') select 3",
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}', 'cont', 'test_filter3.tsv', 'devstoreaccount1', "
f"'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', 'auto', 'x UInt64') select 3",
)
node.query(
f"select count() from azureBlobStorage('http://azurite1:10000/devstoreaccount1', 'cont', 'test_filter*.tsv', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', 'auto', 'x UInt64') where _file = 'test_filter1.tsv'"
f"select count() from azureBlobStorage('{storage_account_url}', 'cont', 'test_filter*.tsv', 'devstoreaccount1', "
f"'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', 'auto', 'x UInt64') "
f"where _file = 'test_filter1.tsv'"
)
node.query("SYSTEM FLUSH LOGS")

View File

@ -49,9 +49,13 @@ def cluster():
cluster.shutdown()
def get_azure_file_content(filename):
def get_azure_file_content(filename, port):
container_name = "cont"
connection_string = "DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1;"
connection_string = (
f"DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;"
f"AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;"
f"BlobEndpoint=http://127.0.0.1:{port}/devstoreaccount1;"
)
blob_service_client = BlobServiceClient.from_connection_string(connection_string)
container_client = blob_service_client.get_container_client(container_name)
blob_client = container_client.get_blob_client(filename)
@ -61,31 +65,28 @@ def get_azure_file_content(filename):
def test_select_all(cluster):
node = cluster.instances["node_0"]
port = cluster.env_variables["AZURITE_PORT"]
storage_account_url = cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]
azure_query(
node,
"INSERT INTO TABLE FUNCTION azureBlobStorage("
"'http://azurite1:10000/devstoreaccount1', 'cont', 'test_cluster_select_all.csv', 'devstoreaccount1', "
"'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV', "
"'auto', 'key UInt64, data String') VALUES (1, 'a'), (2, 'b')",
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}', 'cont', 'test_cluster_select_all.csv', 'devstoreaccount1',"
f"'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV', 'auto', 'key UInt64, data String') "
f"VALUES (1, 'a'), (2, 'b')",
)
print(get_azure_file_content("test_cluster_select_all.csv"))
print(get_azure_file_content("test_cluster_select_all.csv", port))
pure_azure = azure_query(
node,
"""
SELECT * from azureBlobStorage(
'http://azurite1:10000/devstoreaccount1', 'cont', 'test_cluster_select_all.csv', 'devstoreaccount1',
'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV',
'auto')""",
f"SELECT * from azureBlobStorage('{storage_account_url}', 'cont', 'test_cluster_select_all.csv', 'devstoreaccount1',"
f"'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV','auto')",
)
print(pure_azure)
distributed_azure = azure_query(
node,
"""
SELECT * from azureBlobStorageCluster(
'simple_cluster', 'http://azurite1:10000/devstoreaccount1', 'cont', 'test_cluster_select_all.csv', 'devstoreaccount1',
'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV',
'auto')""",
f"SELECT * from azureBlobStorageCluster('simple_cluster', '{storage_account_url}', 'cont', 'test_cluster_select_all.csv', 'devstoreaccount1',"
f"'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV',"
f"'auto')"
"",
)
print(distributed_azure)
assert TSV(pure_azure) == TSV(distributed_azure)
@ -93,31 +94,28 @@ def test_select_all(cluster):
def test_count(cluster):
node = cluster.instances["node_0"]
port = cluster.env_variables["AZURITE_PORT"]
storage_account_url = cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]
azure_query(
node,
"INSERT INTO TABLE FUNCTION azureBlobStorage("
"'http://azurite1:10000/devstoreaccount1', 'cont', 'test_cluster_count.csv', 'devstoreaccount1', "
"'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV', "
"'auto', 'key UInt64') VALUES (1), (2)",
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}', 'cont', 'test_cluster_count.csv', 'devstoreaccount1', "
f"'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV', "
f"'auto', 'key UInt64') VALUES (1), (2)",
)
print(get_azure_file_content("test_cluster_count.csv"))
print(get_azure_file_content("test_cluster_count.csv", port))
pure_azure = azure_query(
node,
"""
SELECT count(*) from azureBlobStorage(
'http://azurite1:10000/devstoreaccount1', 'cont', 'test_cluster_count.csv', 'devstoreaccount1',
'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV',
'auto', 'key UInt64')""",
f"SELECT count(*) from azureBlobStorage('{storage_account_url}', 'cont', 'test_cluster_count.csv', 'devstoreaccount1',"
f"'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV',"
f"'auto', 'key UInt64')",
)
print(pure_azure)
distributed_azure = azure_query(
node,
"""
SELECT count(*) from azureBlobStorageCluster(
'simple_cluster', 'http://azurite1:10000/devstoreaccount1', 'cont', 'test_cluster_count.csv', 'devstoreaccount1',
'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV',
'auto', 'key UInt64')""",
f"SELECT count(*) from azureBlobStorageCluster('simple_cluster', '{storage_account_url}', 'cont', 'test_cluster_count.csv', "
f"'devstoreaccount1','Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV',"
f"'auto', 'key UInt64')",
)
print(distributed_azure)
assert TSV(pure_azure) == TSV(distributed_azure)
@ -125,26 +123,25 @@ def test_count(cluster):
def test_union_all(cluster):
node = cluster.instances["node_0"]
storage_account_url = cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]
azure_query(
node,
"INSERT INTO TABLE FUNCTION azureBlobStorage("
"'http://azurite1:10000/devstoreaccount1', 'cont', 'test_parquet_union_all', 'devstoreaccount1', "
"'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'Parquet', "
"'auto', 'a Int32, b String') VALUES (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd')",
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}', 'cont', 'test_parquet_union_all', 'devstoreaccount1', "
f"'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'Parquet', "
f"'auto', 'a Int32, b String') VALUES (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd')",
)
pure_azure = azure_query(
node,
"""
f"""
SELECT * FROM
(
SELECT * from azureBlobStorage(
'http://azurite1:10000/devstoreaccount1', 'cont', 'test_parquet_union_all', 'devstoreaccount1',
SELECT * from azureBlobStorage('{storage_account_url}', 'cont', 'test_parquet_union_all', 'devstoreaccount1',
'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'Parquet',
'auto', 'a Int32, b String')
UNION ALL
SELECT * from azureBlobStorage(
'http://azurite1:10000/devstoreaccount1', 'cont', 'test_parquet_union_all', 'devstoreaccount1',
'{storage_account_url}', 'cont', 'test_parquet_union_all', 'devstoreaccount1',
'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'Parquet',
'auto', 'a Int32, b String')
)
@ -153,18 +150,18 @@ def test_union_all(cluster):
)
azure_distributed = azure_query(
node,
"""
f"""
SELECT * FROM
(
SELECT * from azureBlobStorageCluster(
'simple_cluster',
'http://azurite1:10000/devstoreaccount1', 'cont', 'test_parquet_union_all', 'devstoreaccount1',
'{storage_account_url}', 'cont', 'test_parquet_union_all', 'devstoreaccount1',
'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'Parquet',
'auto', 'a Int32, b String')
UNION ALL
SELECT * from azureBlobStorageCluster(
'simple_cluster',
'http://azurite1:10000/devstoreaccount1', 'cont', 'test_parquet_union_all', 'devstoreaccount1',
'{storage_account_url}', 'cont', 'test_parquet_union_all', 'devstoreaccount1',
'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'Parquet',
'auto', 'a Int32, b String')
)
@ -177,22 +174,18 @@ def test_union_all(cluster):
def test_skip_unavailable_shards(cluster):
node = cluster.instances["node_0"]
storage_account_url = cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]
azure_query(
node,
"INSERT INTO TABLE FUNCTION azureBlobStorage("
"'http://azurite1:10000/devstoreaccount1', 'cont', 'test_skip_unavailable.csv', 'devstoreaccount1', "
"'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', "
"'auto', 'a UInt64') VALUES (1), (2)",
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}', 'cont', 'test_skip_unavailable.csv', 'devstoreaccount1', "
f"'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', "
f"'auto', 'a UInt64') VALUES (1), (2)",
)
result = azure_query(
node,
"""
SELECT count(*) from azureBlobStorageCluster(
'cluster_non_existent_port',
'http://azurite1:10000/devstoreaccount1', 'cont', 'test_skip_unavailable.csv', 'devstoreaccount1',
'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==')
SETTINGS skip_unavailable_shards = 1
""",
f"SELECT count(*) from azureBlobStorageCluster('cluster_non_existent_port','{storage_account_url}', 'cont', 'test_skip_unavailable.csv', "
f"'devstoreaccount1','Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==') "
f"SETTINGS skip_unavailable_shards = 1",
)
assert result == "2\n"
@ -201,21 +194,17 @@ def test_skip_unavailable_shards(cluster):
def test_unset_skip_unavailable_shards(cluster):
# Although skip_unavailable_shards is not set, cluster table functions should always skip unavailable shards.
node = cluster.instances["node_0"]
storage_account_url = cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]
azure_query(
node,
"INSERT INTO TABLE FUNCTION azureBlobStorage("
"'http://azurite1:10000/devstoreaccount1', 'cont', 'test_unset_skip_unavailable.csv', 'devstoreaccount1', "
"'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', "
"'auto', 'a UInt64') VALUES (1), (2)",
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}', 'cont', 'test_unset_skip_unavailable.csv', 'devstoreaccount1', "
f"'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', "
f"'auto', 'a UInt64') VALUES (1), (2)",
)
result = azure_query(
node,
"""
SELECT count(*) from azureBlobStorageCluster(
'cluster_non_existent_port',
'http://azurite1:10000/devstoreaccount1', 'cont', 'test_skip_unavailable.csv', 'devstoreaccount1',
'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==')
""",
f"SELECT count(*) from azureBlobStorageCluster('cluster_non_existent_port','{storage_account_url}', 'cont', 'test_skip_unavailable.csv', "
f"'devstoreaccount1','Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==')",
)
assert result == "2\n"
@ -223,58 +212,53 @@ def test_unset_skip_unavailable_shards(cluster):
def test_cluster_with_named_collection(cluster):
node = cluster.instances["node_0"]
storage_account_url = cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]
azure_query(
node,
"INSERT INTO TABLE FUNCTION azureBlobStorage("
"'http://azurite1:10000/devstoreaccount1', 'cont', 'test_cluster_with_named_collection.csv', 'devstoreaccount1', "
"'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', "
"'auto', 'a UInt64') VALUES (1), (2)",
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}', 'cont', 'test_cluster_with_named_collection.csv', 'devstoreaccount1', "
f"'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', "
f"'auto', 'a UInt64') VALUES (1), (2)",
)
pure_azure = azure_query(
node,
"""
SELECT * from azureBlobStorage(
'http://azurite1:10000/devstoreaccount1', 'cont', 'test_cluster_with_named_collection.csv', 'devstoreaccount1',
'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==')
""",
f"SELECT * from azureBlobStorage('{storage_account_url}', 'cont', 'test_cluster_with_named_collection.csv', 'devstoreaccount1',"
f"'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==')",
)
azure_cluster = azure_query(
node,
"""
SELECT * from azureBlobStorageCluster(
'simple_cluster', azure_conf2, container='cont', blob_path='test_cluster_with_named_collection.csv')
""",
f"SELECT * from azureBlobStorageCluster('simple_cluster', azure_conf2, storage_account_url = '{storage_account_url}', container='cont', "
f"blob_path='test_cluster_with_named_collection.csv')",
)
assert TSV(pure_azure) == TSV(azure_cluster)
def test_partition_parallel_readig_withcluster(cluster):
def test_partition_parallel_reading_with_cluster(cluster):
node = cluster.instances["node_0"]
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
partition_by = "column3"
values = "(1, 2, 3), (3, 2, 1), (78, 43, 45)"
filename = "test_tf_{_partition_id}.csv"
port = cluster.env_variables["AZURITE_PORT"]
storage_account_url = cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]
azure_query(
node,
f"INSERT INTO TABLE FUNCTION azureBlobStorage('http://azurite1:10000/devstoreaccount1', 'cont', '{filename}', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV', 'auto', '{table_format}') PARTITION BY {partition_by} VALUES {values}",
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}', 'cont', '{filename}', 'devstoreaccount1', "
f"'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV', 'auto', '{table_format}') "
f"PARTITION BY {partition_by} VALUES {values}",
)
assert "1,2,3\n" == get_azure_file_content("test_tf_3.csv")
assert "3,2,1\n" == get_azure_file_content("test_tf_1.csv")
assert "78,43,45\n" == get_azure_file_content("test_tf_45.csv")
assert "1,2,3\n" == get_azure_file_content("test_tf_3.csv", port)
assert "3,2,1\n" == get_azure_file_content("test_tf_1.csv", port)
assert "78,43,45\n" == get_azure_file_content("test_tf_45.csv", port)
azure_cluster = azure_query(
node,
"""
SELECT count(*) from azureBlobStorageCluster(
'simple_cluster',
azure_conf2, container='cont', blob_path='test_tf_*.csv', format='CSV', compression='auto', structure='column1 UInt32, column2 UInt32, column3 UInt32')
""",
f"SELECT count(*) from azureBlobStorageCluster('simple_cluster', azure_conf2, storage_account_url = '{storage_account_url}', "
f"container='cont', blob_path='test_tf_*.csv', format='CSV', compression='auto', structure='column1 UInt32, column2 UInt32, column3 UInt32')",
)
assert azure_cluster == "3\n"

View File

@ -1,7 +1,5 @@
#!/usr/bin/env bash
# Tags: long, no-parallel, no-s3-storage
# FIXME: s3 storage should work OK, it
# reproduces bug which exists not only in S3 version.
# Tags: long, no-parallel
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh

View File

@ -30,7 +30,11 @@ EOF
${CLICKHOUSE_CLIENT} -q "SYSTEM STOP MERGES lazy_mark_test"
${CLICKHOUSE_CLIENT} -q "INSERT INTO lazy_mark_test select number, number % 3, number % 5, number % 10, number % 13, number % 15, number % 17, number % 18, number % 22, number % 25 from numbers(1000000)"
${CLICKHOUSE_CLIENT} -q "SYSTEM DROP MARK CACHE"
${CLICKHOUSE_CLIENT} --log_queries=1 --query_id "${QUERY_ID}" -q "SELECT * FROM lazy_mark_test WHERE n3==11 SETTINGS load_marks_asynchronously=0"
# max_threads=1 is needed because otherwise OpenedFileCache makes ProfileEvents['FileOpen'] nondeterministic
# (usually all threads access the file at overlapping times, and the file is opened just once;
# but sometimes a thread is much slower than others and ends opening the same file a second time)
${CLICKHOUSE_CLIENT} --log_queries=1 --query_id "${QUERY_ID}" -q "SELECT * FROM lazy_mark_test WHERE n3==11 SETTINGS load_marks_asynchronously=0, max_threads=1"
${CLICKHOUSE_CLIENT} -q "SYSTEM FLUSH LOGS"
# Expect 2 open files: n3 marks and n3 data.
${CLICKHOUSE_CLIENT} -q "select ProfileEvents['FileOpen'] from system.query_log where query_id = '${QUERY_ID}' and type = 'QueryFinish' and current_database = currentDatabase()"

View File

@ -8,6 +8,8 @@ CREATE TABLE test_zk_connection_table (
ENGINE ReplicatedMergeTree('zookeeper2:/clickhouse/{database}/02731_zk_connection/{shard}', '{replica}')
ORDER BY tuple();
SET session_timezone = 'UTC';
select name, host, port, index, is_expired, keeper_api_version, (connected_time between yesterday() and now()),
(abs(session_uptime_elapsed_seconds - zookeeperSessionUptime()) < 10), enabled_feature_flags
from system.zookeeper_connection where name='default';

View File

@ -0,0 +1,6 @@
0 334
1 333
2 333
0 334
1 333
2 333

View File

@ -0,0 +1,19 @@
DROP TABLE IF EXISTS 02898_parallel_replicas_final;
CREATE TABLE 02898_parallel_replicas_final (x String, y Int32) ENGINE = ReplacingMergeTree ORDER BY cityHash64(x);
INSERT INTO 02898_parallel_replicas_final SELECT toString(number), number % 3 FROM numbers(1000);
SELECT y, count()
FROM cluster(test_cluster_one_shard_three_replicas_localhost, currentDatabase(), 02898_parallel_replicas_final) FINAL
GROUP BY y
ORDER BY y
SETTINGS max_parallel_replicas=3, parallel_replicas_custom_key='cityHash64(y)', parallel_replicas_custom_key_filter_type='default';
SELECT y, count()
FROM cluster(test_cluster_one_shard_three_replicas_localhost, currentDatabase(), 02898_parallel_replicas_final) FINAL
GROUP BY y
ORDER BY y
SETTINGS max_parallel_replicas=3, parallel_replicas_custom_key='cityHash64(y)', parallel_replicas_custom_key_filter_type='range';
DROP TABLE 02898_parallel_replicas_final;

View File

@ -0,0 +1 @@
1

View File

@ -0,0 +1,7 @@
-- Comparing the same types is ok:
SELECT INTERVAL 1 SECOND = INTERVAL 1 SECOND;
-- It is reasonable to not give an answer for this:
SELECT INTERVAL 30 DAY < INTERVAL 1 MONTH; -- { serverError 386 }
-- This we could change in the future:
SELECT INTERVAL 1 SECOND = INTERVAL 1 YEAR; -- { serverError 386 }
SELECT INTERVAL 1 SECOND <= INTERVAL 1 YEAR; -- { serverError 386 }