Merge branch 'fix-unit-tests' into better-diagnostics-in-functional-tests

This commit is contained in:
Alexey Milovidov 2024-07-26 04:51:46 +02:00
commit 8f91fa8b78
25 changed files with 87 additions and 716 deletions

View File

@ -20,7 +20,7 @@
</max_execution_time> </max_execution_time>
<max_memory_usage> <max_memory_usage>
<max>10G</max> <max>5G</max>
</max_memory_usage> </max_memory_usage>
<table_function_remote_max_addresses> <table_function_remote_max_addresses>

View File

@ -208,7 +208,6 @@ handle SIGPIPE nostop noprint pass
handle SIGTERM nostop noprint pass handle SIGTERM nostop noprint pass
handle SIGUSR1 nostop noprint pass handle SIGUSR1 nostop noprint pass
handle SIGUSR2 nostop noprint pass handle SIGUSR2 nostop noprint pass
handle SIGSEGV nostop pass
handle SIG$RTMIN nostop noprint pass handle SIG$RTMIN nostop noprint pass
info signals info signals
continue continue

View File

@ -20,7 +20,6 @@ handle SIGPIPE nostop noprint pass
handle SIGTERM nostop noprint pass handle SIGTERM nostop noprint pass
handle SIGUSR1 nostop noprint pass handle SIGUSR1 nostop noprint pass
handle SIGUSR2 nostop noprint pass handle SIGUSR2 nostop noprint pass
handle SIGSEGV nostop pass
handle SIG$RTMIN nostop noprint pass handle SIG$RTMIN nostop noprint pass
info signals info signals
continue continue

View File

@ -33,7 +33,7 @@ namespace DB
namespace namespace
{ {
#if defined(OS_LINUX) #if defined(OS_LINUX)
//thread_local size_t write_trace_iteration = 0; thread_local size_t write_trace_iteration = 0;
#endif #endif
/// Even after timer_delete() the signal can be delivered, /// Even after timer_delete() the signal can be delivered,
/// since it does not do anything with pending signals. /// since it does not do anything with pending signals.
@ -57,7 +57,7 @@ namespace
auto saved_errno = errno; /// We must restore previous value of errno in signal handler. auto saved_errno = errno; /// We must restore previous value of errno in signal handler.
#if defined(OS_LINUX) && false //asdqwe #if defined(OS_LINUX)
if (info) if (info)
{ {
int overrun_count = info->si_overrun; int overrun_count = info->si_overrun;
@ -92,7 +92,7 @@ namespace
constexpr bool sanitizer = false; constexpr bool sanitizer = false;
#endif #endif
//asdqwe asynchronous_stack_unwinding = true; asynchronous_stack_unwinding = true;
if (sanitizer || 0 == sigsetjmp(asynchronous_stack_unwinding_signal_jump_buffer, 1)) if (sanitizer || 0 == sigsetjmp(asynchronous_stack_unwinding_signal_jump_buffer, 1))
{ {
stack_trace.emplace(signal_context); stack_trace.emplace(signal_context);

View File

@ -1,5 +1,8 @@
#pragma once #pragma once
/// CLion freezes for a minute on every keypress in any file including this.
#if !defined(__CLION_IDE__)
#include <Common/NamePrompter.h> #include <Common/NamePrompter.h>
#include <Core/BaseSettings.h> #include <Core/BaseSettings.h>
#include <Core/SettingsEnums.h> #include <Core/SettingsEnums.h>
@ -1347,3 +1350,5 @@ struct FormatFactorySettings : public BaseSettings<FormatFactorySettingsTraits>
}; };
} }
#endif

View File

@ -492,48 +492,6 @@ TEST_P(ArchiveReaderAndWriterTest, ManyFilesOnDisk)
} }
} }
TEST_P(ArchiveReaderAndWriterTest, LargeFile)
{
/// Make an archive.
std::string_view contents = "The contents of a.txt\n";
int times = 10000000;
{
auto writer = createArchiveWriter(getPathToArchive());
{
auto out = writer->writeFile("a.txt", times * contents.size());
for (int i = 0; i < times; i++)
writeString(contents, *out);
out->finalize();
}
writer->finalize();
}
/// Read the archive.
auto reader = createArchiveReader(getPathToArchive());
ASSERT_TRUE(reader->fileExists("a.txt"));
auto file_info = reader->getFileInfo("a.txt");
EXPECT_EQ(file_info.uncompressed_size, contents.size() * times);
EXPECT_GT(file_info.compressed_size, 0);
{
auto in = reader->readFile("a.txt", /*throw_on_not_found=*/true);
for (int i = 0; i < times; i++)
ASSERT_TRUE(checkString(String(contents), *in));
}
{
/// Use an enumerator.
auto enumerator = reader->firstFile();
ASSERT_NE(enumerator, nullptr);
EXPECT_EQ(enumerator->getFileName(), "a.txt");
EXPECT_EQ(enumerator->getFileInfo().uncompressed_size, contents.size() * times);
EXPECT_GT(enumerator->getFileInfo().compressed_size, 0);
EXPECT_FALSE(enumerator->nextFile());
}
}
TEST(TarArchiveReaderTest, FileExists) TEST(TarArchiveReaderTest, FileExists)
{ {
String archive_path = "archive.tar"; String archive_path = "archive.tar";

View File

@ -24,7 +24,6 @@
#include <Processors/Sources/NullSource.h> #include <Processors/Sources/NullSource.h>
#include <QueryPipeline/Pipe.h> #include <QueryPipeline/Pipe.h>
#include <Storages/Distributed/DistributedSettings.h> #include <Storages/Distributed/DistributedSettings.h>
#include <Storages/MergeTree/ParallelReplicasReadingCoordinator.h>
#include <Storages/SelectQueryInfo.h> #include <Storages/SelectQueryInfo.h>
#include <Storages/StorageReplicatedMergeTree.h> #include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/StorageSnapshot.h> #include <Storages/StorageSnapshot.h>
@ -517,14 +516,11 @@ void executeQueryWithParallelReplicas(
"`cluster_for_parallel_replicas` setting refers to cluster with several shards. Expected a cluster with one shard"); "`cluster_for_parallel_replicas` setting refers to cluster with several shards. Expected a cluster with one shard");
} }
auto coordinator = std::make_shared<ParallelReplicasReadingCoordinator>(
new_cluster->getShardsInfo().begin()->getAllNodeCount(), settings.parallel_replicas_mark_segment_size);
auto external_tables = new_context->getExternalTables(); auto external_tables = new_context->getExternalTables();
auto read_from_remote = std::make_unique<ReadFromParallelRemoteReplicasStep>( auto read_from_remote = std::make_unique<ReadFromParallelRemoteReplicasStep>(
query_ast, query_ast,
new_cluster, new_cluster,
storage_id, storage_id,
std::move(coordinator),
header, header,
processed_stage, processed_stage,
new_context, new_context,

View File

@ -21,7 +21,7 @@
#include <Client/ConnectionPoolWithFailover.h> #include <Client/ConnectionPoolWithFailover.h>
#include <QueryPipeline/QueryPipelineBuilder.h> #include <QueryPipeline/QueryPipelineBuilder.h>
#include <Parsers/ASTFunction.h> #include <Parsers/ASTFunction.h>
#include <Storages/MergeTree/ParallelReplicasReadingCoordinator.h>
#include <boost/algorithm/string/join.hpp> #include <boost/algorithm/string/join.hpp>
namespace DB namespace DB
@ -362,7 +362,6 @@ ReadFromParallelRemoteReplicasStep::ReadFromParallelRemoteReplicasStep(
ASTPtr query_ast_, ASTPtr query_ast_,
ClusterPtr cluster_, ClusterPtr cluster_,
const StorageID & storage_id_, const StorageID & storage_id_,
ParallelReplicasReadingCoordinatorPtr coordinator_,
Block header_, Block header_,
QueryProcessingStage::Enum stage_, QueryProcessingStage::Enum stage_,
ContextMutablePtr context_, ContextMutablePtr context_,
@ -375,7 +374,6 @@ ReadFromParallelRemoteReplicasStep::ReadFromParallelRemoteReplicasStep(
, cluster(cluster_) , cluster(cluster_)
, query_ast(query_ast_) , query_ast(query_ast_)
, storage_id(storage_id_) , storage_id(storage_id_)
, coordinator(std::move(coordinator_))
, stage(std::move(stage_)) , stage(std::move(stage_))
, context(context_) , context(context_)
, throttler(throttler_) , throttler(throttler_)
@ -438,6 +436,9 @@ void ReadFromParallelRemoteReplicasStep::initializePipeline(QueryPipelineBuilder
shuffled_pool = shard.pool->getShuffledPools(current_settings, priority_func); shuffled_pool = shard.pool->getShuffledPools(current_settings, priority_func);
} }
coordinator
= std::make_shared<ParallelReplicasReadingCoordinator>(max_replicas_to_use, current_settings.parallel_replicas_mark_segment_size);
for (size_t i=0; i < max_replicas_to_use; ++i) for (size_t i=0; i < max_replicas_to_use; ++i)
{ {
IConnections::ReplicaInfo replica_info IConnections::ReplicaInfo replica_info

View File

@ -70,7 +70,6 @@ public:
ASTPtr query_ast_, ASTPtr query_ast_,
ClusterPtr cluster_, ClusterPtr cluster_,
const StorageID & storage_id_, const StorageID & storage_id_,
ParallelReplicasReadingCoordinatorPtr coordinator_,
Block header_, Block header_,
QueryProcessingStage::Enum stage_, QueryProcessingStage::Enum stage_,
ContextMutablePtr context_, ContextMutablePtr context_,

View File

@ -596,12 +596,12 @@ Pipe ReadFromSystemNumbersStep::makePipe()
numbers_storage.step, numbers_storage.step,
step_between_chunks); step_between_chunks);
if (numbers_storage.limit && i == 0) if (end && i == 0)
{ {
auto rows_appr = itemCountInRange(numbers_storage.offset, *numbers_storage.limit, numbers_storage.step); UInt64 rows_approx = itemCountInRange(numbers_storage.offset, *end, numbers_storage.step);
if (limit > 0 && limit < rows_appr) if (limit > 0 && limit < rows_approx)
rows_appr = query_info_limit; rows_approx = query_info_limit;
source->addTotalRowsApprox(rows_appr); source->addTotalRowsApprox(rows_approx);
} }
pipe.addSource(std::move(source)); pipe.addSource(std::move(source));

View File

@ -33,7 +33,10 @@ def create_tables(cluster, table_name):
@pytest.mark.parametrize("skip_unavailable_shards", [1, 0]) @pytest.mark.parametrize("skip_unavailable_shards", [1, 0])
def test_skip_all_replicas(start_cluster, skip_unavailable_shards): @pytest.mark.parametrize("max_parallel_replicas", [2, 3, 100])
def test_skip_all_replicas(
start_cluster, skip_unavailable_shards, max_parallel_replicas
):
cluster_name = "test_1_shard_3_unavaliable_replicas" cluster_name = "test_1_shard_3_unavaliable_replicas"
table_name = "tt" table_name = "tt"
create_tables(cluster_name, table_name) create_tables(cluster_name, table_name)
@ -43,7 +46,7 @@ def test_skip_all_replicas(start_cluster, skip_unavailable_shards):
f"SELECT key, count() FROM {table_name} GROUP BY key ORDER BY key", f"SELECT key, count() FROM {table_name} GROUP BY key ORDER BY key",
settings={ settings={
"allow_experimental_parallel_reading_from_replicas": 2, "allow_experimental_parallel_reading_from_replicas": 2,
"max_parallel_replicas": 3, "max_parallel_replicas": max_parallel_replicas,
"cluster_for_parallel_replicas": cluster_name, "cluster_for_parallel_replicas": cluster_name,
"skip_unavailable_shards": skip_unavailable_shards, "skip_unavailable_shards": skip_unavailable_shards,
}, },

View File

@ -1,13 +0,0 @@
<clickhouse>
<hdfs>
<hadoop_kerberos_keytab>/tmp/keytab/clickhouse.keytab</hadoop_kerberos_keytab>
<hadoop_kerberos_principal>root@TEST.CLICKHOUSE.TECH</hadoop_kerberos_principal>
<hadoop_security_authentication>kerberos</hadoop_security_authentication>
</hdfs>
<hdfs_suser>
<hadoop_kerberos_principal>specuser@TEST.CLICKHOUSE.TECH</hadoop_kerberos_principal>
</hdfs_suser>
<hdfs_dedicatedcachepath>
<hadoop_security_kerberos_ticket_cache_path>/tmp/kerb_cache</hadoop_security_kerberos_ticket_cache_path>
</hdfs_dedicatedcachepath>
</clickhouse>

View File

@ -1,280 +0,0 @@
#!/bin/bash
: "${HADOOP_PREFIX:=/usr/local/hadoop}"
cat >> $HADOOP_PREFIX/etc/hadoop/hadoop-env.sh <<EOF
export HADOOP_SECURE_DN_USER=hdfs
export HADOOP_SECURE_DN_PID_DIR=$HADOOP_PREFIX/pid
export HADOOP_SECURE_DN_LOG_DIR=$HADOOP_PREFIX/logs/hdfs
export JSVC_HOME=$HADOOP_PREFIX/sbin
EOF
$HADOOP_PREFIX/etc/hadoop/hadoop-env.sh
mkdir -p "${HADOOP_SECURE_DN_PID_DIR}"
mkdir -p "${HADOOP_SECURE_DN_LOG_DIR}"
rm /tmp/*.pid
# installing libraries if any - (resource urls added comma separated to the ACP system variable)
cd "${HADOOP_PREFIX}/share/hadoop/common" || exit
for cp in ${ACP//,/ }; do echo "== ${cp}"; curl -LO "${cp}" ; done;
cd - || exit
# altering the core-site configuration
sed "s/HOSTNAME/${HOSTNAME}/" /usr/local/hadoop/etc/hadoop/core-site.xml.template | grep -v '/configuration' > /usr/local/hadoop/etc/hadoop/core-site.xml
cat >> /usr/local/hadoop/etc/hadoop/core-site.xml << EOF
<property>
<name>hadoop.security.authentication</name>
<value>kerberos</value> <!-- A value of "simple" would disable security. -->
</property>
<property>
<name>hadoop.security.authorization</name>
<value>true</value>
</property>
<property>
<name>fs.defaultFS</name>
<value>hdfs://kerberizedhdfs1:9010</value>
</property>
<property>
<name>fs.default.name</name>
<value>hdfs://kerberizedhdfs1:9010</value>
</property>
<!--
<property>
<name>hadoop.rpc.protection</name>
<value>privacy</value>
</property>
-->
</configuration>
EOF
cat > /usr/local/hadoop/etc/hadoop/hdfs-site.xml << EOF
<configuration>
<!--
<property>
<name>dfs.permissions.enabled</name>
<value>false</value>
</property>
-->
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
<!-- General HDFS security config -->
<property>
<name>dfs.block.access.token.enable</name>
<value>true</value>
</property>
<!-- NameNode security config -->
<property>
<name>dfs.namenode.keytab.file</name>
<value>/usr/local/hadoop/etc/hadoop/conf/hdfs.keytab</value> <!-- path to the HDFS keytab -->
</property>
<property>
<name>dfs.namenode.kerberos.principal</name>
<value>hdfs/_HOST@TEST.CLICKHOUSE.TECH</value>
</property>
<property>
<name>dfs.namenode.kerberos.internal.spnego.principal</name>
<value>HTTP/_HOST@TEST.CLICKHOUSE.TECH</value>
</property>
<!-- Secondary NameNode security config -->
<property>
<name>dfs.secondary.namenode.keytab.file</name>
<value>/usr/local/hadoop/etc/hadoop/conf/hdfs.keytab</value> <!-- path to the HDFS keytab -->
</property>
<property>
<name>dfs.secondary.namenode.kerberos.principal</name>
<value>hdfs/_HOST@TEST.CLICKHOUSE.TECH</value>
</property>
<property>
<name>dfs.secondary.namenode.kerberos.internal.spnego.principal</name>
<value>HTTP/_HOST@TEST.CLICKHOUSE.TECH</value>
</property>
<!-- DataNode security config
<property>
<name>dfs.data.transfer.protectionл</name>
<value>integrity</value>
</property>
-->
<property>
<name>dfs.datanode.data.dir.perm</name>
<value>700</value>
</property>
<property>
<name>dfs.datanode.address</name>
<value>0.0.0.0:1004</value>
</property>
<property>
<name>dfs.datanode.http.address</name>
<value>0.0.0.0:1006</value>
</property>
<!-- If the port is 0 then the server will start on a free port. -->
<property>
<name>dfs.datanode.ipc.address</name>
<value>0.0.0.0:0</value>
</property>
<property>
<name>dfs.namenode.secondary.http-address</name>
<value>0.0.0.0:0</value>
</property>
<property>
<name>dfs.namenode.backup.address</name>
<value>0.0.0.0:0</value>
</property>
<property>
<name>dfs.namenode.backup.http-address</name>
<value>0.0.0.0:0</value>
</property>
<!--
<property>
<name>dfs.http.policy</name>
<value>HTTPS_ONLY</value>
</property>
-->
<property>
<name>dfs.datanode.keytab.file</name>
<value>/usr/local/hadoop/etc/hadoop/conf/hdfs.keytab</value> <!-- path to the HDFS keytab -->
</property>
<property>
<name>dfs.datanode.kerberos.principal</name>
<value>hdfs/_HOST@TEST.CLICKHOUSE.TECH</value>
</property>
<!-- Web Authentication config -->
<property>
<name>dfs.webhdfs.enabled</name>
<value>true</value>
</property>
<property>
<name>dfs.encrypt.data.transfer</name>
<value>false</value>
</property>
<property>
<name>dfs.web.authentication.kerberos.principal</name>
<value>HTTP/_HOST@TEST.CLICKHOUSE.TECH</value>
</property>
<property>
<name>dfs.web.authentication.kerberos.keytab</name>
<value>/usr/local/hadoop/etc/hadoop/conf/hdfs.keytab</value> <!-- path to the HDFS keytab -->
</property>
</configuration>
EOF
# cat > /usr/local/hadoop/etc/hadoop/ssl-server.xml << EOF
# <configuration>
# <property>
# <name>ssl.server.truststore.location</name>
# <value>/usr/local/hadoop/etc/hadoop/conf/hdfs.jks</value>
# </property>
# <property>
# <name>ssl.server.truststore.password</name>
# <value>masterkey</value>
# </property>
# <property>
# <name>ssl.server.keystore.location</name>
# <value>/usr/local/hadoop/etc/hadoop/conf/hdfs.jks</value>
# </property>
# <property>
# <name>ssl.server.keystore.password</name>
# <value>masterkey</value>
# </property>
# <property>
# <name>ssl.server.keystore.keypassword</name>
# <value>masterkey</value>
# </property>
# </configuration>
# EOF
cat > /usr/local/hadoop/etc/hadoop/log4j.properties << EOF
# Set everything to be logged to the console
log4j.rootCategory=DEBUG, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
# Set the default spark-shell log level to WARN. When running the spark-shell, the
# log level for this class is used to overwrite the root logger's log level, so that
# the user can have different defaults for the shell and regular Spark apps.
log4j.logger.org.apache.spark.repl.Main=INFO
# Settings to quiet third party logs that are too verbose
log4j.logger.org.spark_project.jetty=DEBUG
log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
log4j.logger.org.apache.parquet=ERROR
log4j.logger.parquet=ERROR
# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR
log4j.logger.org.apache.spark.deploy=DEBUG
log4j.logger.org.apache.spark.executor=DEBUG
log4j.logger.org.apache.spark.scheduler=DEBUG
EOF
useradd -u 1098 hdfs
# keytool -genkey -alias kerberized_hdfs1.test.clickhouse.com -keyalg rsa -keysize 1024 -dname "CN=kerberized_hdfs1.test.clickhouse.com" -keypass masterkey -keystore /usr/local/hadoop/etc/hadoop/conf/hdfs.jks -storepass masterkey
keytool -genkey -alias kerberizedhdfs1 -keyalg rsa -keysize 1024 -dname "CN=kerberizedhdfs1" -keypass masterkey -keystore /usr/local/hadoop/etc/hadoop/conf/hdfs.jks -storepass masterkey
chmod g+r /usr/local/hadoop/etc/hadoop/conf/hdfs.jks
service sshd start
# yum --quiet --assumeyes install krb5-workstation.x86_64
# yum --quiet --assumeyes install tcpdump
# cd /tmp
# curl http://archive.apache.org/dist/commons/daemon/source/commons-daemon-1.0.15-src.tar.gz -o commons-daemon-1.0.15-src.tar.gz
# tar xzf commons-daemon-1.0.15-src.tar.gz
# cd commons-daemon-1.0.15-src/src/native/unix
# ./configure && make
# cp ./jsvc /usr/local/hadoop/sbin
until kinit -kt /usr/local/hadoop/etc/hadoop/conf/hdfs.keytab hdfs/kerberizedhdfs1@TEST.CLICKHOUSE.TECH; do sleep 2; done
echo "KDC is up and ready to go... starting up"
$HADOOP_PREFIX/sbin/start-dfs.sh
$HADOOP_PREFIX/sbin/start-yarn.sh
$HADOOP_PREFIX/sbin/mr-jobhistory-daemon.sh start historyserver
chmod a+r /usr/local/hadoop/etc/hadoop/conf/hdfs.keytab # create dedicated keytab for hdfsuser
$HADOOP_PREFIX/sbin/start-secure-dns.sh
sleep 3
/usr/local/hadoop/bin/hdfs dfsadmin -safemode leave
/usr/local/hadoop/bin/hdfs dfs -mkdir /user/specuser
/usr/local/hadoop/bin/hdfs dfs -chown specuser /user/specuser
echo "chown_completed" | /usr/local/hadoop/bin/hdfs dfs -appendToFile - /preparations_done_marker
kdestroy
# adduser --groups hdfs hdfsuser
# /usr/local/hadoop/sbin/hadoop-daemon.sh --config /usr/local/hadoop/etc/hadoop/ --script /usr/local/hadoop/sbin/hdfs start namenode
# /usr/local/hadoop/sbin/hadoop-daemon.sh --config /usr/local/hadoop/etc/hadoop/ --script /usr/local/hadoop/sbin/hdfs start datanode
if [[ $1 == "-d" ]]; then
while true; do sleep 1000; done
fi
if [[ $1 == "-bash" ]]; then
/bin/bash
fi

View File

@ -1,140 +0,0 @@
#!/bin/bash
set -x # trace
: "${REALM:=TEST.CLICKHOUSE.TECH}"
: "${DOMAIN_REALM:=test.clickhouse.com}"
: "${KERB_MASTER_KEY:=masterkey}"
: "${KERB_ADMIN_USER:=admin}"
: "${KERB_ADMIN_PASS:=admin}"
create_config() {
: "${KDC_ADDRESS:=$(hostname -f)}"
cat>/etc/krb5.conf<<EOF
[logging]
default = FILE:/var/log/kerberos/krb5libs.log
kdc = FILE:/var/log/kerberos/krb5kdc.log
admin_server = FILE:/var/log/kerberos/kadmind.log
[libdefaults]
default_realm = $REALM
dns_lookup_realm = false
dns_lookup_kdc = false
ticket_lifetime = 15d
renew_lifetime = 15d
forwardable = true
# WARNING: We use weaker key types to simplify testing as stronger key types
# require the enhanced security JCE policy file to be installed. You should
# NOT run with this configuration in production or any real environment. You
# have been warned.
default_tkt_enctypes = des-cbc-md5 des-cbc-crc des3-cbc-sha1
default_tgs_enctypes = des-cbc-md5 des-cbc-crc des3-cbc-sha1
permitted_enctypes = des-cbc-md5 des-cbc-crc des3-cbc-sha1
[realms]
$REALM = {
kdc = $KDC_ADDRESS
admin_server = $KDC_ADDRESS
}
[domain_realm]
.$DOMAIN_REALM = $REALM
$DOMAIN_REALM = $REALM
EOF
cat>/var/kerberos/krb5kdc/kdc.conf<<EOF
[kdcdefaults]
kdc_ports = 88
kdc_tcp_ports = 88
[realms]
$REALM = {
acl_file = /var/kerberos/krb5kdc/kadm5.acl
dict_file = /usr/share/dict/words
admin_keytab = /var/kerberos/krb5kdc/kadm5.keytab
# WARNING: We use weaker key types to simplify testing as stronger key types
# require the enhanced security JCE policy file to be installed. You should
# NOT run with this configuration in production or any real environment. You
# have been warned.
master_key_type = des3-hmac-sha1
supported_enctypes = arcfour-hmac:normal des3-hmac-sha1:normal des-cbc-crc:normal des:normal des:v4 des:norealm des:onlyrealm des:afs3
default_principal_flags = +preauth
}
EOF
}
create_db() {
/usr/sbin/kdb5_util -P $KERB_MASTER_KEY -r $REALM create -s
}
start_kdc() {
mkdir -p /var/log/kerberos
/etc/rc.d/init.d/krb5kdc start
/etc/rc.d/init.d/kadmin start
chkconfig krb5kdc on
chkconfig kadmin on
}
restart_kdc() {
/etc/rc.d/init.d/krb5kdc restart
/etc/rc.d/init.d/kadmin restart
}
create_admin_user() {
kadmin.local -q "addprinc -pw $KERB_ADMIN_PASS $KERB_ADMIN_USER/admin"
echo "*/admin@$REALM *" > /var/kerberos/krb5kdc/kadm5.acl
}
create_keytabs() {
rm /tmp/keytab/*.keytab
# kadmin.local -q "addprinc -randkey hdfs/kerberizedhdfs1.${DOMAIN_REALM}@${REALM}"
# kadmin.local -q "ktadd -norandkey -k /tmp/keytab/hdfs.keytab hdfs/kerberizedhdfs1.${DOMAIN_REALM}@${REALM}"
# kadmin.local -q "addprinc -randkey HTTP/kerberizedhdfs1.${DOMAIN_REALM}@${REALM}"
# kadmin.local -q "ktadd -norandkey -k /tmp/keytab/hdfs.keytab HTTP/kerberizedhdfs1.${DOMAIN_REALM}@${REALM}"
kadmin.local -q "addprinc -randkey hdfs/kerberizedhdfs1@${REALM}"
kadmin.local -q "ktadd -norandkey -k /tmp/keytab/hdfs.keytab hdfs/kerberizedhdfs1@${REALM}"
kadmin.local -q "addprinc -randkey HTTP/kerberizedhdfs1@${REALM}"
kadmin.local -q "ktadd -norandkey -k /tmp/keytab/hdfs.keytab HTTP/kerberizedhdfs1@${REALM}"
kadmin.local -q "addprinc -randkey hdfsuser/node1@${REALM}"
kadmin.local -q "ktadd -norandkey -k /tmp/keytab/clickhouse.keytab hdfsuser/node1@${REALM}"
kadmin.local -q "addprinc -randkey hdfsuser@${REALM}"
kadmin.local -q "ktadd -norandkey -k /tmp/keytab/clickhouse.keytab hdfsuser@${REALM}"
kadmin.local -q "addprinc -randkey root@${REALM}"
kadmin.local -q "ktadd -norandkey -k /tmp/keytab/clickhouse.keytab root@${REALM}"
kadmin.local -q "addprinc -randkey specuser@${REALM}"
kadmin.local -q "ktadd -norandkey -k /tmp/keytab/clickhouse.keytab specuser@${REALM}"
chmod g+r /tmp/keytab/clickhouse.keytab
}
main() {
if [ ! -f /kerberos_initialized ]; then
create_config
create_db
create_admin_user
start_kdc
touch /kerberos_initialized
fi
if [ ! -f /var/kerberos/krb5kdc/principal ]; then
while true; do sleep 1000; done
else
start_kdc
create_keytabs
tail -F /var/log/kerberos/krb5kdc.log
fi
}
[[ "$0" == "${BASH_SOURCE[0]}" ]] && main "$@"

View File

@ -1,25 +0,0 @@
[logging]
default = FILE:/var/log/krb5libs.log
kdc = FILE:/var/log/krb5kdc.log
admin_server = FILE:/var/log/kadmind.log
[libdefaults]
default_realm = TEST.CLICKHOUSE.TECH
dns_lookup_realm = false
dns_lookup_kdc = false
ticket_lifetime = 5s
forwardable = true
rdns = false
default_tgs_enctypes = des3-hmac-sha1
default_tkt_enctypes = des3-hmac-sha1
permitted_enctypes = des3-hmac-sha1
[realms]
TEST.CLICKHOUSE.TECH = {
kdc = hdfskerberos
admin_server = hdfskerberos
}
[domain_realm]
.test.clickhouse.com = TEST.CLICKHOUSE.TECH
test.clickhouse.com = TEST.CLICKHOUSE.TECH

View File

@ -1,24 +0,0 @@
[logging]
default = FILE:/var/log/krb5libs.log
kdc = FILE:/var/log/krb5kdc.log
admin_server = FILE:/var/log/kadmind.log
[libdefaults]
default_realm = TEST.CLICKHOUSE.TECH
dns_lookup_realm = false
dns_lookup_kdc = false
ticket_lifetime = 15d
forwardable = true
default_tgs_enctypes = des3-hmac-sha1
default_tkt_enctypes = des3-hmac-sha1
permitted_enctypes = des3-hmac-sha1
[realms]
TEST.CLICKHOUSE.TECH = {
kdc = hdfskerberos
admin_server = hdfskerberos
}
[domain_realm]
.test.clickhouse.com = TEST.CLICKHOUSE.TECH
test.clickhouse.com = TEST.CLICKHOUSE.TECH

View File

@ -1,155 +0,0 @@
import time
import pytest
import os
from helpers.cluster import ClickHouseCluster, is_arm
import subprocess
if is_arm():
pytestmark = pytest.mark.skip
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance(
"node1",
with_kerberized_hdfs=True,
user_configs=[],
main_configs=["configs/hdfs.xml"],
)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
except Exception as ex:
print(ex)
raise ex
finally:
cluster.shutdown()
def test_read_table(started_cluster):
hdfs_api = started_cluster.hdfs_api
data = "1\tSerialize\t555.222\n2\tData\t777.333\n"
hdfs_api.write_data("/simple_table_function", data)
api_read = hdfs_api.read_data("/simple_table_function")
assert api_read == data
select_read = node1.query(
"select * from hdfs('hdfs://kerberizedhdfs1:9010/simple_table_function', 'TSV', 'id UInt64, text String, number Float64')"
)
assert select_read == data
def test_read_write_storage(started_cluster):
hdfs_api = started_cluster.hdfs_api
node1.query(
"create table SimpleHDFSStorage2 (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://kerberizedhdfs1:9010/simple_storage1', 'TSV')"
)
node1.query("insert into SimpleHDFSStorage2 values (1, 'Mark', 72.53)")
api_read = hdfs_api.read_data("/simple_storage1")
assert api_read == "1\tMark\t72.53\n"
select_read = node1.query("select * from SimpleHDFSStorage2")
assert select_read == "1\tMark\t72.53\n"
def test_write_storage_not_expired(started_cluster):
hdfs_api = started_cluster.hdfs_api
node1.query(
"create table SimpleHDFSStorageNotExpired (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://kerberizedhdfs1:9010/simple_storage_not_expired', 'TSV')"
)
time.sleep(15) # wait for ticket expiration
node1.query("insert into SimpleHDFSStorageNotExpired values (1, 'Mark', 72.53)")
api_read = hdfs_api.read_data("/simple_storage_not_expired")
assert api_read == "1\tMark\t72.53\n"
select_read = node1.query("select * from SimpleHDFSStorageNotExpired")
assert select_read == "1\tMark\t72.53\n"
def test_two_users(started_cluster):
hdfs_api = started_cluster.hdfs_api
node1.query(
"create table HDFSStorOne (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://kerberizedhdfs1:9010/storage_user_one', 'TSV')"
)
node1.query("insert into HDFSStorOne values (1, 'Real', 86.00)")
node1.query(
"create table HDFSStorTwo (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://suser@kerberizedhdfs1:9010/user/specuser/storage_user_two', 'TSV')"
)
node1.query("insert into HDFSStorTwo values (1, 'Ideal', 74.00)")
select_read_1 = node1.query(
"select * from hdfs('hdfs://kerberizedhdfs1:9010/user/specuser/storage_user_two', 'TSV', 'id UInt64, text String, number Float64')"
)
select_read_2 = node1.query(
"select * from hdfs('hdfs://suser@kerberizedhdfs1:9010/storage_user_one', 'TSV', 'id UInt64, text String, number Float64')"
)
def test_read_table_expired(started_cluster):
hdfs_api = started_cluster.hdfs_api
data = "1\tSerialize\t555.222\n2\tData\t777.333\n"
hdfs_api.write_data("/simple_table_function_relogin", data)
started_cluster.pause_container("hdfskerberos")
time.sleep(15)
try:
select_read = node1.query(
"select * from hdfs('hdfs://reloginuser&kerberizedhdfs1:9010/simple_table_function', 'TSV', 'id UInt64, text String, number Float64')"
)
assert False, "Exception have to be thrown"
except Exception as ex:
assert "DB::Exception: KerberosInit failure:" in str(ex)
started_cluster.unpause_container("hdfskerberos")
def test_prohibited(started_cluster):
node1.query(
"create table HDFSStorTwoProhibited (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://suser@kerberizedhdfs1:9010/storage_user_two_prohibited', 'TSV')"
)
try:
node1.query("insert into HDFSStorTwoProhibited values (1, 'SomeOne', 74.00)")
assert False, "Exception have to be thrown"
except Exception as ex:
assert (
"Unable to open HDFS file: /storage_user_two_prohibited (hdfs://suser@kerberizedhdfs1:9010/storage_user_two_prohibited) error: Permission denied: user=specuser, access=WRITE"
in str(ex)
)
def test_cache_path(started_cluster):
node1.query(
"create table HDFSStorCachePath (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://dedicatedcachepath@kerberizedhdfs1:9010/storage_dedicated_cache_path', 'TSV')"
)
try:
node1.query("insert into HDFSStorCachePath values (1, 'FatMark', 92.53)")
assert False, "Exception have to be thrown"
except Exception as ex:
assert (
"DB::Exception: hadoop.security.kerberos.ticket.cache.path cannot be set per user"
in str(ex)
)
if __name__ == "__main__":
cluster.start()
input("Cluster created, press any key to destroy...")
cluster.shutdown()

View File

@ -4,8 +4,13 @@
</settings> </settings>
<create_query>CREATE TABLE t (x UInt64, d32 Decimal32(3), d64 Decimal64(4), d128 Decimal128(5)) ENGINE = Memory</create_query> <create_query>CREATE TABLE t (x UInt64, d32 Decimal32(3), d64 Decimal64(4), d128 Decimal128(5)) ENGINE = Memory</create_query>
<!-- use less threads to save memory --> <!-- use less threads and several queries to save memory -->
<fill_query>INSERT INTO t SELECT number AS x, x % 1000000 AS d32, x AS d64, x d128 FROM numbers_mt(500000000) SETTINGS max_threads = 8</fill_query> <fill_query>INSERT INTO t SELECT number AS x, x % 1000000 AS d32, x AS d64, x d128 FROM numbers_mt(100000000) SETTINGS max_threads = 2</fill_query>
<fill_query>INSERT INTO t SELECT number AS x, x % 1000000 AS d32, x AS d64, x d128 FROM numbers_mt(100000000, 100000000) SETTINGS max_threads = 2</fill_query>
<fill_query>INSERT INTO t SELECT number AS x, x % 1000000 AS d32, x AS d64, x d128 FROM numbers_mt(200000000, 100000000) SETTINGS max_threads = 2</fill_query>
<fill_query>INSERT INTO t SELECT number AS x, x % 1000000 AS d32, x AS d64, x d128 FROM numbers_mt(300000000, 100000000) SETTINGS max_threads = 2</fill_query>
<fill_query>INSERT INTO t SELECT number AS x, x % 1000000 AS d32, x AS d64, x d128 FROM numbers_mt(400000000, 100000000) SETTINGS max_threads = 2</fill_query>
<drop_query>DROP TABLE IF EXISTS t</drop_query> <drop_query>DROP TABLE IF EXISTS t</drop_query>
<query>SELECT min(d32), max(d32), argMin(x, d32), argMax(x, d32) FROM t</query> <query>SELECT min(d32), max(d32), argMin(x, d32), argMax(x, d32) FROM t</query>

View File

@ -1,6 +1,6 @@
12 -> 102 12 -> 102
13 -> 103 13 -> 103
14 -> -1 14 -> -1
12(r) -> 102 12 (after reloading) -> 102
13(r) -> 103 13 (after reloading) -> 103
14(r) -> 104 14 (after reloading) -> 104

View File

@ -1,4 +1,6 @@
#!/usr/bin/env bash #!/usr/bin/env bash
# Tags: no-random-settings
# Dictionaries are updated using the server time.
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh # shellcheck source=../shell_config.sh
@ -6,8 +8,7 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
set -e -o pipefail set -e -o pipefail
# NOTE: dictionaries TTLs works with server timezone, so session_timeout cannot be used $CLICKHOUSE_CLIENT --multiquery <<EOF
$CLICKHOUSE_CLIENT --session_timezone '' --multiquery <<EOF
CREATE TABLE ${CLICKHOUSE_DATABASE}.table(x Int64, y Int64, insert_time DateTime) ENGINE = MergeTree ORDER BY tuple(); CREATE TABLE ${CLICKHOUSE_DATABASE}.table(x Int64, y Int64, insert_time DateTime) ENGINE = MergeTree ORDER BY tuple();
INSERT INTO ${CLICKHOUSE_DATABASE}.table VALUES (12, 102, now()); INSERT INTO ${CLICKHOUSE_DATABASE}.table VALUES (12, 102, now());
@ -28,16 +29,21 @@ $CLICKHOUSE_CLIENT --query "SELECT '12 -> ', dictGetInt64('${CLICKHOUSE_DATABASE
$CLICKHOUSE_CLIENT --query "INSERT INTO ${CLICKHOUSE_DATABASE}.table VALUES (13, 103, now())" $CLICKHOUSE_CLIENT --query "INSERT INTO ${CLICKHOUSE_DATABASE}.table VALUES (13, 103, now())"
$CLICKHOUSE_CLIENT --query "INSERT INTO ${CLICKHOUSE_DATABASE}.table VALUES (14, 104, now() - INTERVAL 1 DAY)" $CLICKHOUSE_CLIENT --query "INSERT INTO ${CLICKHOUSE_DATABASE}.table VALUES (14, 104, now() - INTERVAL 1 DAY)"
# Wait when the dictionary will update the value for 13 on its own:
while [ "$(${CLICKHOUSE_CLIENT} --query "SELECT dictGetInt64('${CLICKHOUSE_DATABASE}.dict', 'y', toUInt64(13))")" = -1 ] while [ "$(${CLICKHOUSE_CLIENT} --query "SELECT dictGetInt64('${CLICKHOUSE_DATABASE}.dict', 'y', toUInt64(13))")" = -1 ]
do do
sleep 0.5 sleep 0.5
done done
$CLICKHOUSE_CLIENT --query "SELECT '13 -> ', dictGetInt64('${CLICKHOUSE_DATABASE}.dict', 'y', toUInt64(13))" $CLICKHOUSE_CLIENT --query "SELECT '13 -> ', dictGetInt64('${CLICKHOUSE_DATABASE}.dict', 'y', toUInt64(13))"
# By the way, the value for 14 is expected to not be updated at this moment,
# because the values were selected by the update field insert_time, and for 14 it was set as one day ago.
$CLICKHOUSE_CLIENT --query "SELECT '14 -> ', dictGetInt64('${CLICKHOUSE_DATABASE}.dict', 'y', toUInt64(14))" $CLICKHOUSE_CLIENT --query "SELECT '14 -> ', dictGetInt64('${CLICKHOUSE_DATABASE}.dict', 'y', toUInt64(14))"
# SYSTEM RELOAD DICTIONARY reloads it completely, regardless of the update field, so we will see new values, even for key 14.
$CLICKHOUSE_CLIENT --query "SYSTEM RELOAD DICTIONARY '${CLICKHOUSE_DATABASE}.dict'" $CLICKHOUSE_CLIENT --query "SYSTEM RELOAD DICTIONARY '${CLICKHOUSE_DATABASE}.dict'"
$CLICKHOUSE_CLIENT --query "SELECT '12(r) -> ', dictGetInt64('${CLICKHOUSE_DATABASE}.dict', 'y', toUInt64(12))" $CLICKHOUSE_CLIENT --query "SELECT '12 (after reloading) -> ', dictGetInt64('${CLICKHOUSE_DATABASE}.dict', 'y', toUInt64(12))"
$CLICKHOUSE_CLIENT --query "SELECT '13(r) -> ', dictGetInt64('${CLICKHOUSE_DATABASE}.dict', 'y', toUInt64(13))" $CLICKHOUSE_CLIENT --query "SELECT '13 (after reloading) -> ', dictGetInt64('${CLICKHOUSE_DATABASE}.dict', 'y', toUInt64(13))"
$CLICKHOUSE_CLIENT --query "SELECT '14(r) -> ', dictGetInt64('${CLICKHOUSE_DATABASE}.dict', 'y', toUInt64(14))" $CLICKHOUSE_CLIENT --query "SELECT '14 (after reloading) -> ', dictGetInt64('${CLICKHOUSE_DATABASE}.dict', 'y', toUInt64(14))"

View File

@ -13,7 +13,7 @@ function insert1()
{ {
local TIMELIMIT=$((SECONDS+$1)) local TIMELIMIT=$((SECONDS+$1))
while [ $SECONDS -lt "$TIMELIMIT" ]; do while [ $SECONDS -lt "$TIMELIMIT" ]; do
${MY_CLICKHOUSE_CLIENT} --wait_for_async_insert 0 -q 'INSERT INTO async_inserts_race FORMAT CSV 1,"a"' ${MY_CLICKHOUSE_CLIENT} --insert_keeper_fault_injection_probability=0 --wait_for_async_insert 0 -q 'INSERT INTO async_inserts_race FORMAT CSV 1,"a"'
done done
} }
@ -21,7 +21,7 @@ function insert2()
{ {
local TIMELIMIT=$((SECONDS+$1)) local TIMELIMIT=$((SECONDS+$1))
while [ $SECONDS -lt "$TIMELIMIT" ]; do while [ $SECONDS -lt "$TIMELIMIT" ]; do
${MY_CLICKHOUSE_CLIENT} --wait_for_async_insert 0 -q 'INSERT INTO async_inserts_race FORMAT JSONEachRow {"id": 5, "s": "e"} {"id": 6, "s": "f"}' ${MY_CLICKHOUSE_CLIENT} --insert_keeper_fault_injection_probability=0 --wait_for_async_insert 0 -q 'INSERT INTO async_inserts_race FORMAT JSONEachRow {"id": 5, "s": "e"} {"id": 6, "s": "f"}'
done done
} }
@ -29,7 +29,7 @@ function insert3()
{ {
local TIMELIMIT=$((SECONDS+$1)) local TIMELIMIT=$((SECONDS+$1))
while [ $SECONDS -lt "$TIMELIMIT" ]; do while [ $SECONDS -lt "$TIMELIMIT" ]; do
${MY_CLICKHOUSE_CLIENT} --wait_for_async_insert 1 -q "INSERT INTO async_inserts_race VALUES (7, 'g') (8, 'h')" & ${MY_CLICKHOUSE_CLIENT} --insert_keeper_fault_injection_probability=0 --wait_for_async_insert 1 -q "INSERT INTO async_inserts_race VALUES (7, 'g') (8, 'h')" &
sleep 0.05 sleep 0.05
done done

View File

@ -0,0 +1,35 @@
SET alter_sync = 2;
SET max_parallel_replicas = 3, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', parallel_replicas_for_non_replicated_merge_tree = true;
DROP TABLE IF EXISTS t1;
DROP TABLE IF EXISTS t1__fuzz_26;
CREATE TABLE t1__fuzz_26 (`a` Nullable(Float64), `b` Nullable(Float32), `pk` Int64) ENGINE = MergeTree ORDER BY pk;
CREATE TABLE t1 ( a Float64, b Int64, pk String) Engine = MergeTree() ORDER BY pk;
ALTER TABLE t1
(MODIFY COLUMN `a` Float64 TTL toDateTime(b) + toIntervalMonth(viewExplain('EXPLAIN', 'actions = 1', (
SELECT
toIntervalMonth(1),
2
FROM t1__fuzz_26
GROUP BY
toFixedString('%Prewhere%', 10),
toNullable(12)
WITH ROLLUP
)), 1)) settings allow_experimental_parallel_reading_from_replicas = 1; -- { serverError INCORRECT_RESULT_OF_SCALAR_SUBQUERY }
ALTER TABLE t1
(MODIFY COLUMN `a` Float64 TTL toDateTime(b) + toIntervalMonth(viewExplain('EXPLAIN', 'actions = 1', (
SELECT
toIntervalMonth(1),
2
FROM t1__fuzz_26
GROUP BY
toFixedString('%Prewhere%', 10),
toNullable(12)
WITH ROLLUP
)), 1)) settings allow_experimental_parallel_reading_from_replicas = 0; -- { serverError INCORRECT_RESULT_OF_SCALAR_SUBQUERY }
DROP TABLE t1;
DROP TABLE t1__fuzz_26;

View File

@ -0,0 +1 @@
SELECT number FROM numbers(2, 1) WHERE number % 2 = 0 SETTINGS max_rows_to_read = 10;