Merge branch 'master' into boringssl2

This commit is contained in:
Alexey Milovidov 2020-12-18 05:59:30 +03:00
commit 1be41beca0
79 changed files with 1932 additions and 353 deletions

View File

@ -6,6 +6,7 @@ set (SRCS
demangle.cpp
getFQDNOrHostName.cpp
getMemoryAmount.cpp
getPageSize.cpp
getThreadId.cpp
JSON.cpp
LineReader.cpp

View File

@ -1,5 +1,6 @@
#include <stdexcept>
#include "common/getMemoryAmount.h"
#include "common/getPageSize.h"
#include <unistd.h>
#include <sys/types.h>
@ -18,7 +19,7 @@ uint64_t getMemoryAmountOrZero()
if (num_pages <= 0)
return 0;
int64_t page_size = sysconf(_SC_PAGESIZE);
int64_t page_size = getPageSize();
if (page_size <= 0)
return 0;

View File

@ -0,0 +1,8 @@
#include "common/getPageSize.h"
#include <unistd.h>
Int64 getPageSize()
{
return sysconf(_SC_PAGESIZE);
}

View File

@ -0,0 +1,6 @@
#pragma once
#include "common/types.h"
/// Get memory page size
Int64 getPageSize();

View File

@ -47,6 +47,7 @@ SRCS(
errnoToString.cpp
getFQDNOrHostName.cpp
getMemoryAmount.cpp
getPageSize.cpp
getResource.cpp
getThreadId.cpp
mremap.cpp

2
contrib/libgsasl vendored

@ -1 +1 @@
Subproject commit 140fb58250588c8323285b75fcf127c4adc33dfa
Subproject commit 383ee28e82f69fa16ed43b48bd9c8ee5b313ab84

2
contrib/libhdfs3 vendored

@ -1 +1 @@
Subproject commit 30552ac527f2c14070d834e171493b2e7f662375
Subproject commit 095b9d48b400abb72d967cb0539af13b1e3d90cf

View File

@ -17,7 +17,12 @@ if (NOT USE_INTERNAL_PROTOBUF_LIBRARY AND PROTOBUF_OLD_ABI_COMPAT)
endif ()
endif()
set(WITH_KERBEROS false)
if (${ENABLE_LIBRARIES} AND ${ENABLE_KRB5})
SET(WITH_KERBEROS 1)
else()
SET(WITH_KERBEROS 0)
endif()
# project and source dir
set(HDFS3_ROOT_DIR ${ClickHouse_SOURCE_DIR}/contrib/libhdfs3)
set(HDFS3_SOURCE_DIR ${HDFS3_ROOT_DIR}/src)
@ -28,11 +33,6 @@ set(CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/CMake" ${CMAKE_MODULE_PATH})
include(Platform)
include(Options)
# prefer shared libraries
if (WITH_KERBEROS)
find_package(KERBEROS REQUIRED)
endif()
# source
set(PROTO_FILES
#${HDFS3_SOURCE_DIR}/proto/encryption.proto
@ -207,14 +207,11 @@ target_include_directories(hdfs3 PRIVATE ${HDFS3_COMMON_DIR})
target_include_directories(hdfs3 PRIVATE ${CMAKE_CURRENT_BINARY_DIR})
target_include_directories(hdfs3 PRIVATE ${LIBGSASL_INCLUDE_DIR})
if (WITH_KERBEROS)
target_include_directories(hdfs3 PRIVATE ${KERBEROS_INCLUDE_DIRS})
endif()
target_include_directories(hdfs3 PRIVATE ${LIBXML2_INCLUDE_DIR})
target_link_libraries(hdfs3 PRIVATE ${LIBGSASL_LIBRARY})
if (WITH_KERBEROS)
target_link_libraries(hdfs3 PRIVATE ${KERBEROS_LIBRARIES})
target_link_libraries(hdfs3 PRIVATE ${KRB5_LIBRARY})
endif()
target_link_libraries(hdfs3 PRIVATE ${LIBXML2_LIBRARIES})

2
debian/control vendored
View File

@ -40,7 +40,7 @@ Description: Common files for ClickHouse
Package: clickhouse-server
Architecture: all
Depends: ${shlibs:Depends}, ${misc:Depends}, clickhouse-common-static (= ${binary:Version}), adduser
Recommends: libcap2-bin
Recommends: libcap2-bin, krb5-user
Replaces: clickhouse-server-common, clickhouse-server-base
Provides: clickhouse-server-common
Description: Server binary for ClickHouse

View File

@ -158,5 +158,9 @@
"name": "yandex/clickhouse-stateless-unbundled-test",
"dependent": [
]
},
"docker/test/integration/kerberized_hadoop": {
"name": "yandex/clickhouse-kerberized-hadoop",
"dependent": []
}
}

View File

@ -0,0 +1,18 @@
# docker build -t yandex/clickhouse-kerberized-hadoop .
FROM sequenceiq/hadoop-docker:2.7.0
RUN sed -i -e 's/^\#baseurl/baseurl/' /etc/yum.repos.d/CentOS-Base.repo
RUN sed -i -e 's/^mirrorlist/#mirrorlist/' /etc/yum.repos.d/CentOS-Base.repo
RUN sed -i -e 's#http://mirror.centos.org/#http://vault.centos.org/#' /etc/yum.repos.d/CentOS-Base.repo
RUN yum clean all && \
rpm --rebuilddb && \
yum -y update && \
yum -y install yum-plugin-ovl && \
yum --quiet -y install krb5-workstation.x86_64
RUN cd /tmp && \
curl http://archive.apache.org/dist/commons/daemon/source/commons-daemon-1.0.15-src.tar.gz -o commons-daemon-1.0.15-src.tar.gz && \
tar xzf commons-daemon-1.0.15-src.tar.gz && \
cd commons-daemon-1.0.15-src/src/native/unix && \
./configure && \
make && \
cp ./jsvc /usr/local/hadoop/sbin

View File

@ -29,6 +29,8 @@ RUN apt-get update \
libcurl4-openssl-dev \
gdb \
software-properties-common \
libkrb5-dev \
krb5-user \
&& rm -rf \
/var/lib/apt/lists/* \
/var/cache/debconf \
@ -75,7 +77,8 @@ RUN python3 -m pip install \
pytest-timeout \
redis \
tzlocal \
urllib3
urllib3 \
requests-kerberos
COPY modprobe.sh /usr/local/bin/modprobe
COPY dockerd-entrypoint.sh /usr/local/bin/

View File

@ -2,6 +2,7 @@ version: '2.3'
services:
hdfs1:
image: sequenceiq/hadoop-docker:2.7.0
hostname: hdfs1
restart: always
ports:
- 50075:50075

View File

@ -0,0 +1,29 @@
version: '2.3'
services:
kerberizedhdfs1:
cap_add:
- DAC_READ_SEARCH
image: yandex/clickhouse-kerberized-hadoop:16621
hostname: kerberizedhdfs1
restart: always
volumes:
- ${KERBERIZED_HDFS_DIR}/../../hdfs_configs/bootstrap.sh:/etc/bootstrap.sh:ro
- ${KERBERIZED_HDFS_DIR}/secrets:/usr/local/hadoop/etc/hadoop/conf
- ${KERBERIZED_HDFS_DIR}/secrets/krb_long.conf:/etc/krb5.conf:ro
ports:
- 1006:1006
- 50070:50070
- 9000:9000
depends_on:
- hdfskerberos
entrypoint: /etc/bootstrap.sh -d
hdfskerberos:
image: yandex/clickhouse-kerberos-kdc:${DOCKER_KERBEROS_KDC_TAG}
hostname: hdfskerberos
volumes:
- ${KERBERIZED_HDFS_DIR}/secrets:/tmp/keytab
- ${KERBERIZED_HDFS_DIR}/../../kerberos_image_config.sh:/config.sh
- /dev/urandom:/dev/random
ports: [88, 749]

View File

@ -108,6 +108,95 @@ Create table with files named `file000`, `file001`, … , `file999`:
``` sql
CREATE TABLE big_table (name String, value UInt32) ENGINE = HDFS('hdfs://hdfs1:9000/big_dir/file{0..9}{0..9}{0..9}', 'CSV')
```
## Configuration {#configuration}
Similar to GraphiteMergeTree, the HDFS engine supports extended configuration using the ClickHouse config file. There are two configuration keys that you can use: global (`hdfs`) and user-level (`hdfs_*`). The global configuration is applied first, and then the user-level configuration is applied (if it exists).
``` xml
<!-- Global configuration options for HDFS engine type -->
<hdfs>
<hadoop_kerberos_keytab>/tmp/keytab/clickhouse.keytab</hadoop_kerberos_keytab>
<hadoop_kerberos_principal>clickuser@TEST.CLICKHOUSE.TECH</hadoop_kerberos_principal>
<hadoop_security_authentication>kerberos</hadoop_security_authentication>
</hdfs>
<!-- Configuration specific for user "root" -->
<hdfs_root>
<hadoop_kerberos_principal>root@TEST.CLICKHOUSE.TECH</hadoop_kerberos_principal>
</hdfs_root>
```
### List of possible configuration options with default values
#### Supported by libhdfs3
| **parameter** | **default value** |
| rpc\_client\_connect\_tcpnodelay | true |
| dfs\_client\_read\_shortcircuit | true |
| output\_replace-datanode-on-failure | true |
| input\_notretry-another-node | false |
| input\_localread\_mappedfile | true |
| dfs\_client\_use\_legacy\_blockreader\_local | false |
| rpc\_client\_ping\_interval | 10 * 1000 |
| rpc\_client\_connect\_timeout | 600 * 1000 |
| rpc\_client\_read\_timeout | 3600 * 1000 |
| rpc\_client\_write\_timeout | 3600 * 1000 |
| rpc\_client\_socekt\_linger\_timeout | -1 |
| rpc\_client\_connect\_retry | 10 |
| rpc\_client\_timeout | 3600 * 1000 |
| dfs\_default\_replica | 3 |
| input\_connect\_timeout | 600 * 1000 |
| input\_read\_timeout | 3600 * 1000 |
| input\_write\_timeout | 3600 * 1000 |
| input\_localread\_default\_buffersize | 1 * 1024 * 1024 |
| dfs\_prefetchsize | 10 |
| input\_read\_getblockinfo\_retry | 3 |
| input\_localread\_blockinfo\_cachesize | 1000 |
| input\_read\_max\_retry | 60 |
| output\_default\_chunksize | 512 |
| output\_default\_packetsize | 64 * 1024 |
| output\_default\_write\_retry | 10 |
| output\_connect\_timeout | 600 * 1000 |
| output\_read\_timeout | 3600 * 1000 |
| output\_write\_timeout | 3600 * 1000 |
| output\_close\_timeout | 3600 * 1000 |
| output\_packetpool\_size | 1024 |
| output\_heeartbeat\_interval | 10 * 1000 |
| dfs\_client\_failover\_max\_attempts | 15 |
| dfs\_client\_read\_shortcircuit\_streams\_cache\_size | 256 |
| dfs\_client\_socketcache\_expiryMsec | 3000 |
| dfs\_client\_socketcache\_capacity | 16 |
| dfs\_default\_blocksize | 64 * 1024 * 1024 |
| dfs\_default\_uri | "hdfs://localhost:9000" |
| hadoop\_security\_authentication | "simple" |
| hadoop\_security\_kerberos\_ticket\_cache\_path | "" |
| dfs\_client\_log\_severity | "INFO" |
| dfs\_domain\_socket\_path | "" |
[HDFS Configuration Reference ](https://hawq.apache.org/docs/userguide/2.3.0.0-incubating/reference/HDFSConfigurationParameterReference.html) might explain some parameters.
#### ClickHouse extras {#clickhouse-extras}
| **parameter** | **default value** |
|hadoop\_kerberos\_keytab | "" |
|hadoop\_kerberos\_principal | "" |
|hadoop\_kerberos\_kinit\_command | kinit |
#### Limitations {#limitations}
* hadoop\_security\_kerberos\_ticket\_cache\_path can be global only, not user specific
## Kerberos support {#kerberos-support}
If hadoop\_security\_authentication parameter has value 'kerberos', ClickHouse authentifies via Kerberos facility.
Parameters [here](#clickhouse-extras) and hadoop\_security\_kerberos\_ticket\_cache\_path may be of help.
Note that due to libhdfs3 limitations only old-fashioned approach is supported,
datanode communications are not secured by SASL (HADOOP\_SECURE\_DN\_USER is a reliable indicator of such
security approach). Use tests/integration/test\_storage\_kerberized\_hdfs/hdfs_configs/bootstrap.sh for reference.
If hadoop\_kerberos\_keytab, hadoop\_kerberos\_principal or hadoop\_kerberos\_kinit\_command is specified, kinit will be invoked. hadoop\_kerberos\_keytab and hadoop\_kerberos\_principal are mandatory in this case. kinit tool and krb5 configuration files are required.
## Virtual Columns {#virtual-columns}

View File

@ -388,7 +388,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
LOG_WARNING(log, "Server was built in debug mode. It will work slowly.");
#endif
#if defined(ADDRESS_SANITIZER) || defined(THREAD_SANITIZER) || defined(MEMORY_SANITIZER)
#if defined(SANITIZER)
LOG_WARNING(log, "Server was built with sanitizer. It will work slowly.");
#endif

View File

@ -392,9 +392,12 @@ bool ContextAccess::checkAccessImpl2(const AccessFlags & flags, const Args &...
if (!getUser())
return access_denied("User has been dropped", ErrorCodes::UNKNOWN_USER);
/// If the current user was allowed to create a temporary table
/// then he is allowed to do with it whatever he wants.
if ((sizeof...(args) >= 2) && (getDatabase(args...) == DatabaseCatalog::TEMPORARY_DATABASE))
/// Access to temporary tables is controlled in an unusual way, not like normal tables.
/// Creating of temporary tables is controlled by AccessType::CREATE_TEMPORARY_TABLES grant,
/// and other grants are considered as always given.
/// The DatabaseCatalog class won't resolve StorageID for temporary tables
/// which shouldn't be accessed.
if (getDatabase(args...) == DatabaseCatalog::TEMPORARY_DATABASE)
return access_granted();
auto acs = getAccessRightsWithImplicit();

View File

@ -88,6 +88,10 @@ if (USE_AWS_S3)
add_headers_and_sources(dbms Disks/S3)
endif()
if (USE_HDFS)
add_headers_and_sources(dbms Storages/HDFS)
endif()
list (APPEND clickhouse_common_io_sources ${CONFIG_BUILD})
list (APPEND clickhouse_common_io_headers ${CONFIG_VERSION} ${CONFIG_COMMON})
@ -389,8 +393,8 @@ if (USE_GRPC)
endif()
if (USE_HDFS)
target_link_libraries (clickhouse_common_io PUBLIC ${HDFS3_LIBRARY})
target_include_directories (clickhouse_common_io SYSTEM BEFORE PUBLIC ${HDFS3_INCLUDE_DIR})
dbms_target_link_libraries(PRIVATE ${HDFS3_LIBRARY})
dbms_target_include_directories (SYSTEM BEFORE PUBLIC ${HDFS3_INCLUDE_DIR})
endif()
if (USE_AWS_S3)

View File

@ -26,6 +26,7 @@
#define DISABLE_MREMAP 1
#endif
#include <common/mremap.h>
#include <common/getPageSize.h>
#include <Common/MemoryTracker.h>
#include <Common/Exception.h>
@ -59,7 +60,6 @@
*/
extern const size_t MMAP_THRESHOLD;
static constexpr size_t MMAP_MIN_ALIGNMENT = 4096;
static constexpr size_t MALLOC_MIN_ALIGNMENT = 8;
namespace DB
@ -194,10 +194,11 @@ private:
void * allocNoTrack(size_t size, size_t alignment)
{
void * buf;
size_t mmap_min_alignment = ::getPageSize();
if (size >= MMAP_THRESHOLD)
{
if (alignment > MMAP_MIN_ALIGNMENT)
if (alignment > mmap_min_alignment)
throw DB::Exception(fmt::format("Too large alignment {}: more than page size when allocating {}.",
ReadableSize(alignment), ReadableSize(size)), DB::ErrorCodes::BAD_ARGUMENTS);

View File

@ -83,10 +83,11 @@ private:
/// Last contiguous chunk of memory.
Chunk * head;
size_t size_in_bytes;
size_t page_size;
static size_t roundUpToPageSize(size_t s)
static size_t roundUpToPageSize(size_t s, size_t page_size)
{
return (s + 4096 - 1) / 4096 * 4096;
return (s + page_size - 1) / page_size * page_size;
}
/// If chunks size is less than 'linear_growth_threshold', then use exponential growth, otherwise - linear growth
@ -113,7 +114,7 @@ private:
}
assert(size_after_grow >= min_next_size);
return roundUpToPageSize(size_after_grow);
return roundUpToPageSize(size_after_grow, page_size);
}
/// Add next contiguous chunk of memory with size not less than specified.
@ -129,7 +130,8 @@ private:
public:
Arena(size_t initial_size_ = 4096, size_t growth_factor_ = 2, size_t linear_growth_threshold_ = 128 * 1024 * 1024)
: growth_factor(growth_factor_), linear_growth_threshold(linear_growth_threshold_),
head(new Chunk(initial_size_, nullptr)), size_in_bytes(head->size())
head(new Chunk(initial_size_, nullptr)), size_in_bytes(head->size()),
page_size(static_cast<size_t>(::getPageSize()))
{
}

View File

@ -13,6 +13,8 @@
#include <boost/noncopyable.hpp>
#include <ext/scope_guard.h>
#include <common/getPageSize.h>
#include <Common/Exception.h>
#include <Common/randomSeed.h>
#include <Common/formatReadable.h>
@ -326,8 +328,6 @@ private:
return (x + (rounding - 1)) / rounding * rounding;
}
static constexpr size_t page_size = 4096;
/// Sizes and addresses of allocated memory will be aligned to specified boundary.
static constexpr size_t alignment = 16;
@ -505,6 +505,7 @@ private:
/// If nothing was found and total size of allocated chunks plus required size is lower than maximum,
/// allocate a new chunk.
size_t page_size = static_cast<size_t>(::getPageSize());
size_t required_chunk_size = std::max(min_chunk_size, roundUp(size, page_size));
if (total_chunks_size + required_chunk_size <= max_total_size)
{

View File

@ -8,10 +8,11 @@
#include "MemoryStatisticsOS.h"
#include <common/logger_useful.h>
#include <common/getPageSize.h>
#include <Common/Exception.h>
#include <IO/ReadBufferFromMemory.h>
#include <IO/ReadHelpers.h>
#include <common/logger_useful.h>
namespace DB
@ -26,7 +27,6 @@ namespace ErrorCodes
}
static constexpr auto filename = "/proc/self/statm";
static constexpr size_t PAGE_SIZE = 4096;
MemoryStatisticsOS::MemoryStatisticsOS()
{
@ -93,11 +93,12 @@ MemoryStatisticsOS::Data MemoryStatisticsOS::get() const
skipWhitespaceIfAny(in);
readIntText(data.data_and_stack, in);
data.virt *= PAGE_SIZE;
data.resident *= PAGE_SIZE;
data.shared *= PAGE_SIZE;
data.code *= PAGE_SIZE;
data.data_and_stack *= PAGE_SIZE;
size_t page_size = static_cast<size_t>(::getPageSize());
data.virt *= page_size;
data.resident *= page_size;
data.shared *= page_size;
data.code *= page_size;
data.data_and_stack *= page_size;
return data;
}

View File

@ -1,5 +1,6 @@
#pragma once
#include <common/getPageSize.h>
#include <Common/Exception.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/UTF8Helpers.h>
@ -37,7 +38,7 @@ struct StringSearcherBase
{
#ifdef __SSE2__
static constexpr auto n = sizeof(__m128i);
const int page_size = getpagesize();
const int page_size = ::getPageSize();
bool pageSafe(const void * const ptr) const
{

View File

@ -2,11 +2,14 @@
#include <Common/ThreadProfileEvents.h>
#include <Common/QueryProfiler.h>
#include <Common/ThreadStatus.h>
#include <common/errnoToString.h>
#include <Interpreters/OpenTelemetrySpanLog.h>
#include <Poco/Logger.h>
#include <common/getThreadId.h>
#include <signal.h>
namespace DB
{
@ -21,6 +24,11 @@ namespace ErrorCodes
thread_local ThreadStatus * current_thread = nullptr;
thread_local ThreadStatus * main_thread = nullptr;
#if !defined(SANITIZER) && !defined(ARCADIA_BUILD)
alignas(4096) static thread_local char alt_stack[4096];
static thread_local bool has_alt_stack = false;
#endif
ThreadStatus::ThreadStatus()
: thread_id{getThreadId()}
@ -34,6 +42,46 @@ ThreadStatus::ThreadStatus()
/// NOTE: It is important not to do any non-trivial actions (like updating ProfileEvents or logging) before ThreadStatus is created
/// Otherwise it could lead to SIGSEGV due to current_thread dereferencing
/// Will set alternative signal stack to provide diagnostics for stack overflow errors.
/// If not already installed for current thread.
/// Sanitizer makes larger stack usage and also it's incompatible with alternative stack by default (it sets up and relies on its own).
#if !defined(SANITIZER) && !defined(ARCADIA_BUILD)
if (!has_alt_stack)
{
/// Don't repeat tries even if not installed successfully.
has_alt_stack = true;
/// We have to call 'sigaltstack' before first 'sigaction'. (It does not work other way, for unknown reason).
stack_t altstack_description{};
altstack_description.ss_sp = alt_stack;
altstack_description.ss_flags = 0;
altstack_description.ss_size = sizeof(alt_stack);
if (0 != sigaltstack(&altstack_description, nullptr))
{
LOG_WARNING(log, "Cannot set alternative signal stack for thread, {}", errnoToString(errno));
}
else
{
/// Obtain existing sigaction and modify it by adding a flag.
struct sigaction action{};
if (0 != sigaction(SIGSEGV, nullptr, &action))
{
LOG_WARNING(log, "Cannot obtain previous signal action to set alternative signal stack for thread, {}", errnoToString(errno));
}
else if (!(action.sa_flags & SA_ONSTACK))
{
action.sa_flags |= SA_ONSTACK;
if (0 != sigaction(SIGSEGV, &action, nullptr))
{
LOG_WARNING(log, "Cannot set action with alternative signal stack for thread, {}", errnoToString(errno));
}
}
}
}
#endif
}
ThreadStatus::~ThreadStatus()

View File

@ -5,6 +5,7 @@
#cmakedefine01 USE_RE2_ST
#cmakedefine01 USE_SSL
#cmakedefine01 USE_HDFS
#cmakedefine01 USE_INTERNAL_HDFS3_LIBRARY
#cmakedefine01 USE_AWS_S3
#cmakedefine01 USE_BROTLI
#cmakedefine01 USE_UNWIND

View File

@ -61,9 +61,10 @@ public:
return std::make_shared<DataTypeUInt8>();
}
[[clang::optnone]] void executeImpl(Block & columns, const ColumnNumbers & arguments, size_t result, size_t input_rows_count) const override
[[clang::optnone]]
ColumnPtr executeImpl(const ColumnsWithTypeAndName & block, const DataTypePtr & result_type, size_t input_rows_count) const override
{
if (const ColumnConst * column = checkAndGetColumnConst<ColumnString>(columns[arguments[0]].column.get()))
if (const ColumnConst * column = checkAndGetColumnConst<ColumnString>(block[0].column.get()))
{
String mode = column->getValue<String>();
@ -135,6 +136,10 @@ public:
{
(void)context.getCurrentQueryId();
}
else if (mode == "stack overflow")
{
executeImpl(block, result_type, input_rows_count);
}
else if (mode == "mmap many")
{
std::vector<void *> maps;
@ -160,7 +165,7 @@ public:
else
throw Exception("The only argument for function " + getName() + " must be constant String", ErrorCodes::ILLEGAL_COLUMN);
columns[result].column = columns[result].type->createColumnConst(input_rows_count, 0ULL);
return result_type->createColumnConst(input_rows_count, 0ULL);
}
};

View File

@ -1,62 +0,0 @@
#include <IO/HDFSCommon.h>
#include <Poco/URI.h>
#if USE_HDFS
#include <Common/Exception.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int NETWORK_ERROR;
}
HDFSBuilderPtr createHDFSBuilder(const std::string & uri_str)
{
const Poco::URI uri(uri_str);
const auto & host = uri.getHost();
auto port = uri.getPort();
const std::string path = "//";
if (host.empty())
throw Exception("Illegal HDFS URI: " + uri.toString(), ErrorCodes::BAD_ARGUMENTS);
HDFSBuilderPtr builder(hdfsNewBuilder());
if (builder == nullptr)
throw Exception("Unable to create builder to connect to HDFS: " + uri.toString() + " " + std::string(hdfsGetLastError()),
ErrorCodes::NETWORK_ERROR);
hdfsBuilderConfSetStr(builder.get(), "input.read.timeout", "60000"); // 1 min
hdfsBuilderConfSetStr(builder.get(), "input.write.timeout", "60000"); // 1 min
hdfsBuilderConfSetStr(builder.get(), "input.connect.timeout", "60000"); // 1 min
std::string user_info = uri.getUserInfo();
if (!user_info.empty() && user_info.front() != ':')
{
std::string user;
size_t delim_pos = user_info.find(':');
if (delim_pos != std::string::npos)
user = user_info.substr(0, delim_pos);
else
user = user_info;
hdfsBuilderSetUserName(builder.get(), user.c_str());
}
hdfsBuilderSetNameNode(builder.get(), host.c_str());
if (port != 0)
{
hdfsBuilderSetNameNodePort(builder.get(), port);
}
return builder;
}
HDFSFSPtr createHDFSFS(hdfsBuilder * builder)
{
HDFSFSPtr fs(hdfsBuilderConnect(builder));
if (fs == nullptr)
throw Exception("Unable to connect to HDFS: " + std::string(hdfsGetLastError()),
ErrorCodes::NETWORK_ERROR);
return fs;
}
}
#endif

View File

@ -1,58 +0,0 @@
#pragma once
#include <Common/config.h>
#include <memory>
#include <type_traits>
#if USE_HDFS
#include <hdfs/hdfs.h>
namespace DB
{
namespace detail
{
struct HDFSBuilderDeleter
{
void operator()(hdfsBuilder * builder_ptr)
{
hdfsFreeBuilder(builder_ptr);
}
};
struct HDFSFsDeleter
{
void operator()(hdfsFS fs_ptr)
{
hdfsDisconnect(fs_ptr);
}
};
}
struct HDFSFileInfo
{
hdfsFileInfo * file_info;
int length;
HDFSFileInfo()
: file_info(nullptr)
, length(0)
{
}
HDFSFileInfo(const HDFSFileInfo & other) = delete;
HDFSFileInfo(HDFSFileInfo && other) = default;
HDFSFileInfo & operator=(const HDFSFileInfo & other) = delete;
HDFSFileInfo & operator=(HDFSFileInfo && other) = default;
~HDFSFileInfo()
{
hdfsFreeFileInfo(file_info, length);
}
};
using HDFSBuilderPtr = std::unique_ptr<hdfsBuilder, detail::HDFSBuilderDeleter>;
using HDFSFSPtr = std::unique_ptr<std::remove_pointer_t<hdfsFS>, detail::HDFSFsDeleter>;
// set read/connect timeout, default value in libhdfs3 is about 1 hour, and too large
/// TODO Allow to tune from query Settings.
HDFSBuilderPtr createHDFSBuilder(const std::string & uri_str);
HDFSFSPtr createHDFSFS(hdfsBuilder * builder);
}
#endif

View File

@ -6,6 +6,7 @@
#include <Common/ProfileEvents.h>
#include <Common/formatReadable.h>
#include <Common/Exception.h>
#include <common/getPageSize.h>
#include <IO/WriteHelpers.h>
#include <IO/MMapReadBufferFromFileDescriptor.h>
@ -38,7 +39,9 @@ void MMapReadBufferFromFileDescriptor::init(int fd_, size_t offset, size_t lengt
ErrorCodes::CANNOT_ALLOCATE_MEMORY);
BufferBase::set(static_cast<char *>(buf), length, 0);
ReadBuffer::padded = (length % 4096) > 0 && (length % 4096) <= (4096 - 15); /// TODO determine page size
size_t page_size = static_cast<size_t>(::getPageSize());
ReadBuffer::padded = (length % page_size) > 0 && (length % page_size) <= (page_size - 15);
}
}

View File

@ -3,6 +3,7 @@
#include <iostream>
#include <common/types.h>
#include <common/getPageSize.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromFile.h>
@ -16,6 +17,7 @@ int main(int, char **)
{
static const size_t N = 100000;
static const size_t BUF_SIZE = 1048576;
size_t page_size = static_cast<size_t>(::getPageSize());
ReadBufferFromFile rand_in("/dev/urandom");
unsigned rand = 0;
@ -33,7 +35,7 @@ int main(int, char **)
}
{
ReadBufferFromFile rb("test1", BUF_SIZE, O_RDONLY | O_DIRECT, nullptr, 4096);
ReadBufferFromFile rb("test1", BUF_SIZE, O_RDONLY | O_DIRECT, nullptr, page_size);
String res;
for (size_t i = 0; i < N; ++i)
readStringBinary(res, rb);
@ -44,14 +46,14 @@ int main(int, char **)
/// Write to file with O_DIRECT, read as usual.
{
WriteBufferFromFile wb("test2", BUF_SIZE, O_WRONLY | O_CREAT | O_TRUNC | O_DIRECT, 0666, nullptr, 4096);
WriteBufferFromFile wb("test2", BUF_SIZE, O_WRONLY | O_CREAT | O_TRUNC | O_DIRECT, 0666, nullptr, page_size);
for (size_t i = 0; i < N; ++i)
writeStringBinary(test, wb);
if (wb.offset() % 4096 != 0)
if (wb.offset() % page_size != 0)
{
size_t pad = 4096 - wb.offset() % 4096;
size_t pad = page_size - wb.offset() % page_size;
memset(wb.position(), 0, pad);
wb.position() += pad;
}

View File

@ -5,6 +5,7 @@
#include <Common/TaskStatsInfoGetter.h>
#include <Poco/File.h>
#include <Common/Stopwatch.h>
#include <common/getPageSize.h>
#include <common/getThreadId.h>
#include <IO/WriteBufferFromString.h>
#include <linux/taskstats.h>
@ -61,8 +62,9 @@ static void do_io(size_t id)
std::string path_dst = "test_out_" + std::to_string(id);
{
size_t page_size = static_cast<size_t>(::getPageSize());
ReadBufferFromFile rb("/dev/urandom");
WriteBufferFromFile wb(path_dst, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_CREAT | O_TRUNC | O_DIRECT, 0666, nullptr, 4096);
WriteBufferFromFile wb(path_dst, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_CREAT | O_TRUNC | O_DIRECT, 0666, nullptr, page_size);
copyData(rb, wb, copy_size);
wb.close();
}

View File

@ -0,0 +1,194 @@
#include <Storages/HDFS/HDFSCommon.h>
#include <Poco/URI.h>
#include <boost/algorithm/string/replace.hpp>
#if USE_HDFS
#include <Common/ShellCommand.h>
#include <Common/Exception.h>
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>
#include <common/logger_useful.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int NETWORK_ERROR;
extern const int EXCESSIVE_ELEMENT_IN_CONFIG;
extern const int NO_ELEMENTS_IN_CONFIG;
}
const String HDFSBuilderWrapper::CONFIG_PREFIX = "hdfs";
void HDFSBuilderWrapper::loadFromConfig(const Poco::Util::AbstractConfiguration & config,
const String & config_path, bool isUser)
{
Poco::Util::AbstractConfiguration::Keys keys;
config.keys(config_path, keys);
for (const auto & key : keys)
{
const String key_path = config_path + "." + key;
String key_name;
if (key == "hadoop_kerberos_keytab")
{
need_kinit = true;
hadoop_kerberos_keytab = config.getString(key_path);
continue;
}
else if (key == "hadoop_kerberos_principal")
{
need_kinit = true;
hadoop_kerberos_principal = config.getString(key_path);
#if USE_INTERNAL_HDFS3_LIBRARY
hdfsBuilderSetPrincipal(hdfs_builder, hadoop_kerberos_principal.c_str());
#endif
continue;
}
else if (key == "hadoop_kerberos_kinit_command")
{
need_kinit = true;
hadoop_kerberos_kinit_command = config.getString(key_path);
continue;
}
else if (key == "hadoop_security_kerberos_ticket_cache_path")
{
if (isUser)
{
throw Exception("hadoop.security.kerberos.ticket.cache.path cannot be set per user",
ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
}
hadoop_security_kerberos_ticket_cache_path = config.getString(key_path);
// standard param - pass further
}
key_name = boost::replace_all_copy(key, "_", ".");
const auto & [k,v] = keep(key_name, config.getString(key_path));
hdfsBuilderConfSetStr(hdfs_builder, k.c_str(), v.c_str());
}
}
String HDFSBuilderWrapper::getKinitCmd()
{
if (hadoop_kerberos_keytab.empty() || hadoop_kerberos_principal.empty())
{
throw Exception("Not enough parameters to run kinit",
ErrorCodes::NO_ELEMENTS_IN_CONFIG);
}
WriteBufferFromOwnString ss;
String cache_name = hadoop_security_kerberos_ticket_cache_path.empty() ?
String() :
(String(" -c \"") + hadoop_security_kerberos_ticket_cache_path + "\"");
// command to run looks like
// kinit -R -t /keytab_dir/clickhouse.keytab -k somebody@TEST.CLICKHOUSE.TECH || ..
ss << hadoop_kerberos_kinit_command << cache_name <<
" -R -t \"" << hadoop_kerberos_keytab << "\" -k " << hadoop_kerberos_principal <<
"|| " << hadoop_kerberos_kinit_command << cache_name << " -t \"" <<
hadoop_kerberos_keytab << "\" -k " << hadoop_kerberos_principal;
return ss.str();
}
void HDFSBuilderWrapper::runKinit()
{
String cmd = getKinitCmd();
LOG_DEBUG(&Poco::Logger::get("HDFSClient"), "running kinit: {}", cmd);
std::unique_lock<std::mutex> lck(kinit_mtx);
auto command = ShellCommand::execute(cmd);
auto status = command->tryWait();
if (status)
{
throw Exception("kinit failure: " + cmd, ErrorCodes::BAD_ARGUMENTS);
}
}
HDFSBuilderWrapper createHDFSBuilder(const String & uri_str, const Poco::Util::AbstractConfiguration & config)
{
const Poco::URI uri(uri_str);
const auto & host = uri.getHost();
auto port = uri.getPort();
const String path = "//";
if (host.empty())
throw Exception("Illegal HDFS URI: " + uri.toString(), ErrorCodes::BAD_ARGUMENTS);
HDFSBuilderWrapper builder;
if (builder.get() == nullptr)
throw Exception("Unable to create builder to connect to HDFS: " +
uri.toString() + " " + String(hdfsGetLastError()),
ErrorCodes::NETWORK_ERROR);
hdfsBuilderConfSetStr(builder.get(), "input.read.timeout", "60000"); // 1 min
hdfsBuilderConfSetStr(builder.get(), "input.write.timeout", "60000"); // 1 min
hdfsBuilderConfSetStr(builder.get(), "input.connect.timeout", "60000"); // 1 min
String user_info = uri.getUserInfo();
String user;
if (!user_info.empty() && user_info.front() != ':')
{
size_t delim_pos = user_info.find(':');
if (delim_pos != String::npos)
user = user_info.substr(0, delim_pos);
else
user = user_info;
hdfsBuilderSetUserName(builder.get(), user.c_str());
}
hdfsBuilderSetNameNode(builder.get(), host.c_str());
if (port != 0)
{
hdfsBuilderSetNameNodePort(builder.get(), port);
}
if (config.has(HDFSBuilderWrapper::CONFIG_PREFIX))
{
builder.loadFromConfig(config, HDFSBuilderWrapper::CONFIG_PREFIX);
}
if (!user.empty())
{
String user_config_prefix = HDFSBuilderWrapper::CONFIG_PREFIX + "_" + user;
if (config.has(user_config_prefix))
{
#if USE_INTERNAL_HDFS3_LIBRARY
builder.loadFromConfig(config, user_config_prefix, true);
#else
throw Exception("Multi user HDFS configuration required internal libhdfs3",
ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
#endif
}
}
if (builder.need_kinit)
{
builder.runKinit();
}
return builder;
}
std::mutex HDFSBuilderWrapper::kinit_mtx;
HDFSFSPtr createHDFSFS(hdfsBuilder * builder)
{
HDFSFSPtr fs(hdfsBuilderConnect(builder));
if (fs == nullptr)
throw Exception("Unable to connect to HDFS: " + String(hdfsGetLastError()),
ErrorCodes::NETWORK_ERROR);
return fs;
}
}
#endif

View File

@ -0,0 +1,114 @@
#pragma once
#include <Common/config.h>
#if USE_HDFS
#include <memory>
#include <type_traits>
#include <vector>
#include <hdfs/hdfs.h>
#include <common/types.h>
#include <mutex>
#include <Interpreters/Context.h>
#include <Poco/Util/AbstractConfiguration.h>
namespace DB
{
namespace detail
{
struct HDFSFsDeleter
{
void operator()(hdfsFS fs_ptr)
{
hdfsDisconnect(fs_ptr);
}
};
}
struct HDFSFileInfo
{
hdfsFileInfo * file_info;
int length;
HDFSFileInfo()
: file_info(nullptr)
, length(0)
{
}
HDFSFileInfo(const HDFSFileInfo & other) = delete;
HDFSFileInfo(HDFSFileInfo && other) = default;
HDFSFileInfo & operator=(const HDFSFileInfo & other) = delete;
HDFSFileInfo & operator=(HDFSFileInfo && other) = default;
~HDFSFileInfo()
{
hdfsFreeFileInfo(file_info, length);
}
};
class HDFSBuilderWrapper
{
hdfsBuilder * hdfs_builder;
String hadoop_kerberos_keytab;
String hadoop_kerberos_principal;
String hadoop_kerberos_kinit_command = "kinit";
String hadoop_security_kerberos_ticket_cache_path;
static std::mutex kinit_mtx;
std::vector<std::pair<String, String>> config_stor;
// hdfs builder relies on an external config data storage
std::pair<String, String>& keep(const String & k, const String & v)
{
return config_stor.emplace_back(std::make_pair(k, v));
}
bool need_kinit{false};
static const String CONFIG_PREFIX;
private:
void loadFromConfig(const Poco::Util::AbstractConfiguration & config, const String & config_path, bool isUser = false);
String getKinitCmd();
void runKinit();
public:
hdfsBuilder *
get()
{
return hdfs_builder;
}
HDFSBuilderWrapper()
: hdfs_builder(hdfsNewBuilder())
{
}
~HDFSBuilderWrapper()
{
hdfsFreeBuilder(hdfs_builder);
}
HDFSBuilderWrapper(const HDFSBuilderWrapper &) = delete;
HDFSBuilderWrapper(HDFSBuilderWrapper &&) = default;
friend HDFSBuilderWrapper createHDFSBuilder(const String & uri_str, const Poco::Util::AbstractConfiguration &);
};
using HDFSFSPtr = std::unique_ptr<std::remove_pointer_t<hdfsFS>, detail::HDFSFsDeleter>;
// set read/connect timeout, default value in libhdfs3 is about 1 hour, and too large
/// TODO Allow to tune from query Settings.
HDFSBuilderWrapper createHDFSBuilder(const String & uri_str, const Poco::Util::AbstractConfiguration &);
HDFSFSPtr createHDFSFS(hdfsBuilder * builder);
}
#endif

View File

@ -1,7 +1,7 @@
#include "ReadBufferFromHDFS.h"
#if USE_HDFS
#include <IO/HDFSCommon.h>
#include <Storages/HDFS/HDFSCommon.h>
#include <hdfs/hdfs.h>
#include <mutex>
@ -23,15 +23,16 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl
std::string hdfs_uri;
hdfsFile fin;
HDFSBuilderPtr builder;
HDFSBuilderWrapper builder;
HDFSFSPtr fs;
explicit ReadBufferFromHDFSImpl(const std::string & hdfs_name_)
: hdfs_uri(hdfs_name_)
explicit ReadBufferFromHDFSImpl(const std::string & hdfs_name_,
const Poco::Util::AbstractConfiguration & config_)
: hdfs_uri(hdfs_name_),
builder(createHDFSBuilder(hdfs_uri, config_))
{
std::lock_guard lock(hdfs_init_mutex);
builder = createHDFSBuilder(hdfs_uri);
fs = createHDFSFS(builder.get());
const size_t begin_of_path = hdfs_uri.find('/', hdfs_uri.find("//") + 2);
const std::string path = hdfs_uri.substr(begin_of_path);
@ -58,11 +59,14 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl
}
};
std::mutex ReadBufferFromHDFS::ReadBufferFromHDFSImpl::hdfs_init_mutex;
ReadBufferFromHDFS::ReadBufferFromHDFS(const std::string & hdfs_name_, size_t buf_size)
: BufferWithOwnMemory<ReadBuffer>(buf_size)
, impl(std::make_unique<ReadBufferFromHDFSImpl>(hdfs_name_))
ReadBufferFromHDFS::ReadBufferFromHDFS(const std::string & hdfs_name_,
const Poco::Util::AbstractConfiguration & config_,
size_t buf_size_)
: BufferWithOwnMemory<ReadBuffer>(buf_size_)
, impl(std::make_unique<ReadBufferFromHDFSImpl>(hdfs_name_, config_))
{
}

View File

@ -8,6 +8,13 @@
#include <string>
#include <memory>
#include <hdfs/hdfs.h>
#include <common/types.h>
#include <Interpreters/Context.h>
namespace DB
{
/** Accepts HDFS path to file and opens it.
@ -18,7 +25,7 @@ class ReadBufferFromHDFS : public BufferWithOwnMemory<ReadBuffer>
struct ReadBufferFromHDFSImpl;
std::unique_ptr<ReadBufferFromHDFSImpl> impl;
public:
ReadBufferFromHDFS(const std::string & hdfs_name_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE);
ReadBufferFromHDFS(const std::string & hdfs_name_, const Poco::Util::AbstractConfiguration &, size_t buf_size_ = DBMS_DEFAULT_BUFFER_SIZE);
~ReadBufferFromHDFS() override;
bool nextImpl() override;

View File

@ -3,15 +3,15 @@
#if USE_HDFS
#include <Storages/StorageFactory.h>
#include <Storages/StorageHDFS.h>
#include <Storages/HDFS/StorageHDFS.h>
#include <Interpreters/Context.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Parsers/ASTLiteral.h>
#include <IO/ReadHelpers.h>
#include <IO/ReadBufferFromHDFS.h>
#include <IO/WriteBufferFromHDFS.h>
#include <Storages/HDFS/ReadBufferFromHDFS.h>
#include <Storages/HDFS/WriteBufferFromHDFS.h>
#include <IO/WriteHelpers.h>
#include <IO/HDFSCommon.h>
#include <Storages/HDFS/HDFSCommon.h>
#include <Formats/FormatFactory.h>
#include <DataTypes/DataTypeString.h>
#include <DataStreams/IBlockOutputStream.h>
@ -121,7 +121,7 @@ public:
current_path = uri + path;
auto compression = chooseCompressionMethod(path, compression_method);
auto read_buf = wrapReadBufferWithCompressionMethod(std::make_unique<ReadBufferFromHDFS>(current_path), compression);
auto read_buf = wrapReadBufferWithCompressionMethod(std::make_unique<ReadBufferFromHDFS>(current_path, context.getGlobalContext().getConfigRef()), compression);
auto input_stream = FormatFactory::instance().getInput(format, *read_buf, sample_block, context, max_block_size);
reader = std::make_shared<OwningBlockInputStream<ReadBuffer>>(input_stream, std::move(read_buf));
@ -180,7 +180,7 @@ public:
const CompressionMethod compression_method)
: sample_block(sample_block_)
{
write_buf = wrapWriteBufferWithCompressionMethod(std::make_unique<WriteBufferFromHDFS>(uri), compression_method, 3);
write_buf = wrapWriteBufferWithCompressionMethod(std::make_unique<WriteBufferFromHDFS>(uri, context.getGlobalContext().getConfigRef()), compression_method, 3);
writer = FormatFactory::instance().getOutput(format, *write_buf, sample_block, context);
}
@ -274,7 +274,7 @@ Pipe StorageHDFS::read(
const String path_from_uri = uri.substr(begin_of_path);
const String uri_without_path = uri.substr(0, begin_of_path);
HDFSBuilderPtr builder = createHDFSBuilder(uri_without_path + "/");
HDFSBuilderWrapper builder = createHDFSBuilder(uri_without_path + "/", context_.getGlobalContext().getConfigRef());
HDFSFSPtr fs = createHDFSFS(builder.get());
auto sources_info = std::make_shared<HDFSSource::SourcesInfo>();

View File

@ -2,8 +2,9 @@
#if USE_HDFS
#include <IO/WriteBufferFromHDFS.h>
#include <IO/HDFSCommon.h>
#include <Interpreters/Context.h>
#include <Storages/HDFS/WriteBufferFromHDFS.h>
#include <Storages/HDFS/HDFSCommon.h>
#include <hdfs/hdfs.h>
@ -23,12 +24,12 @@ struct WriteBufferFromHDFS::WriteBufferFromHDFSImpl
{
std::string hdfs_uri;
hdfsFile fout;
HDFSBuilderPtr builder;
HDFSBuilderWrapper builder;
HDFSFSPtr fs;
explicit WriteBufferFromHDFSImpl(const std::string & hdfs_name_)
explicit WriteBufferFromHDFSImpl(const std::string & hdfs_name_, const Poco::Util::AbstractConfiguration & config_)
: hdfs_uri(hdfs_name_)
, builder(createHDFSBuilder(hdfs_uri))
, builder(createHDFSBuilder(hdfs_uri,config_))
, fs(createHDFSFS(builder.get()))
{
const size_t begin_of_path = hdfs_uri.find('/', hdfs_uri.find("//") + 2);
@ -72,9 +73,9 @@ struct WriteBufferFromHDFS::WriteBufferFromHDFSImpl
}
};
WriteBufferFromHDFS::WriteBufferFromHDFS(const std::string & hdfs_name_, size_t buf_size)
: BufferWithOwnMemory<WriteBuffer>(buf_size)
, impl(std::make_unique<WriteBufferFromHDFSImpl>(hdfs_name_))
WriteBufferFromHDFS::WriteBufferFromHDFS(const std::string & hdfs_name_, const Poco::Util::AbstractConfiguration & config_, size_t buf_size_)
: BufferWithOwnMemory<WriteBuffer>(buf_size_)
, impl(std::make_unique<WriteBufferFromHDFSImpl>(hdfs_name_, config_))
{
}

View File

@ -18,7 +18,7 @@ class WriteBufferFromHDFS : public BufferWithOwnMemory<WriteBuffer>
struct WriteBufferFromHDFSImpl;
std::unique_ptr<WriteBufferFromHDFSImpl> impl;
public:
WriteBufferFromHDFS(const std::string & hdfs_name_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE);
WriteBufferFromHDFS(const std::string & hdfs_name_, const Poco::Util::AbstractConfiguration &, size_t buf_size_ = DBMS_DEFAULT_BUFFER_SIZE);
WriteBufferFromHDFS(WriteBufferFromHDFS &&) = default;
void nextImpl() override;

View File

@ -491,6 +491,8 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(
if (!zookeeper->tryGet(quorum_info.status_path, value, nullptr, event))
break;
LOG_TRACE(log, "Quorum node {} still exists, will wait for updates", quorum_info.status_path);
ReplicatedMergeTreeQuorumEntry quorum_entry(value);
/// If the node has time to disappear, and then appear again for the next insert.
@ -499,6 +501,8 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(
if (!event->tryWait(quorum_timeout_ms))
throw Exception("Timeout while waiting for quorum", ErrorCodes::TIMEOUT_EXCEEDED);
LOG_TRACE(log, "Quorum {} updated, will check quorum node still exists", quorum_info.status_path);
}
/// And what if it is possible that the current replica at this time has ceased to be active

View File

@ -72,11 +72,27 @@ StorageMerge::StorageMerge(
const StorageID & table_id_,
const ColumnsDescription & columns_,
const String & source_database_,
const String & table_name_regexp_,
const Strings & source_tables_,
const Context & context_)
: IStorage(table_id_)
, source_database(source_database_)
, table_name_regexp(table_name_regexp_)
, source_tables(std::in_place, source_tables_.begin(), source_tables_.end())
, global_context(context_.getGlobalContext())
{
StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(columns_);
setInMemoryMetadata(storage_metadata);
}
StorageMerge::StorageMerge(
const StorageID & table_id_,
const ColumnsDescription & columns_,
const String & source_database_,
const String & source_table_regexp_,
const Context & context_)
: IStorage(table_id_)
, source_database(source_database_)
, source_table_regexp(source_table_regexp_)
, global_context(context_.getGlobalContext())
{
StorageInMemoryMetadata storage_metadata;
@ -439,8 +455,17 @@ DatabaseTablesIteratorPtr StorageMerge::getDatabaseIterator(const Context & cont
e.addMessage("while getting table iterator of Merge table. Maybe caused by two Merge tables that will endlessly try to read each other's data");
throw;
}
auto database = DatabaseCatalog::instance().getDatabase(source_database);
auto table_name_match = [this](const String & table_name_) { return table_name_regexp.match(table_name_); };
auto table_name_match = [this](const String & table_name_) -> bool
{
if (source_tables)
return source_tables->count(table_name_);
else
return source_table_regexp->match(table_name_);
};
return database->getTablesIterator(context, table_name_match);
}

View File

@ -48,7 +48,8 @@ public:
private:
String source_database;
OptimizedRegularExpression table_name_regexp;
std::optional<std::unordered_set<String>> source_tables;
std::optional<OptimizedRegularExpression> source_table_regexp;
const Context & global_context;
using StorageWithLockAndName = std::tuple<StoragePtr, TableLockHolder, String>;
@ -72,7 +73,14 @@ protected:
const StorageID & table_id_,
const ColumnsDescription & columns_,
const String & source_database_,
const String & table_name_regexp_,
const Strings & source_tables_,
const Context & context_);
StorageMerge(
const StorageID & table_id_,
const ColumnsDescription & columns_,
const String & source_database_,
const String & source_table_regexp_,
const Context & context_);
Pipe createSources(

View File

@ -3235,6 +3235,8 @@ void StorageReplicatedMergeTree::updateQuorum(const String & part_name, bool is_
ReplicatedMergeTreeQuorumEntry quorum_entry(value);
if (quorum_entry.part_name != part_name)
{
LOG_TRACE(log, "Quorum {}, already achieved for part {} current part {}",
quorum_status_path, part_name, quorum_entry.part_name);
/// The quorum has already been achieved. Moreover, another INSERT with a quorum has already started.
break;
}
@ -3244,6 +3246,8 @@ void StorageReplicatedMergeTree::updateQuorum(const String & part_name, bool is_
if (quorum_entry.replicas.size() >= quorum_entry.required_number_of_replicas)
{
/// The quorum is reached. Delete the node, and update information about the last part that was successfully written with quorum.
LOG_TRACE(log, "Got {} replicas confirmed quorum {}, going to remove node",
quorum_entry.replicas.size(), quorum_status_path);
Coordination::Requests ops;
Coordination::Responses responses;
@ -3291,6 +3295,8 @@ void StorageReplicatedMergeTree::updateQuorum(const String & part_name, bool is_
}
else
{
LOG_TRACE(log, "Quorum {} still not satisfied (have only {} replicas), updating node",
quorum_status_path, quorum_entry.replicas.size());
/// We update the node, registering there one more replica.
auto code = zookeeper->trySet(quorum_status_path, quorum_entry.toString(), stat.version);

View File

@ -40,8 +40,6 @@
namespace DB
{
#define INDEX_BUFFER_SIZE 4096
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
@ -319,7 +317,7 @@ Pipe StorageStripeLog::read(
return Pipe(std::make_shared<NullSource>(metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID())));
}
CompressedReadBufferFromFile index_in(disk->readFile(index_file, INDEX_BUFFER_SIZE));
CompressedReadBufferFromFile index_in(disk->readFile(index_file, 4096));
std::shared_ptr<const IndexForNativeFormat> index{std::make_shared<IndexForNativeFormat>(index_in, column_names_set)};
size_t size = index->blocks.size();

View File

@ -272,17 +272,28 @@ Pipe StorageSystemColumns::read(
Pipes pipes;
{
Databases databases = DatabaseCatalog::instance().getDatabases();
/// Add `database` column.
MutableColumnPtr database_column_mut = ColumnString::create();
for (const auto & database : databases)
const auto databases = DatabaseCatalog::instance().getDatabases();
for (const auto & [database_name, database] : databases)
{
if (database_name == DatabaseCatalog::TEMPORARY_DATABASE)
continue; /// We don't want to show the internal database for temporary tables in system.columns
/// We are skipping "Lazy" database because we cannot afford initialization of all its tables.
/// This should be documented.
if (database.second->getEngineName() != "Lazy")
database_column_mut->insert(database.first);
if (database->getEngineName() != "Lazy")
database_column_mut->insert(database_name);
}
Tables external_tables;
if (context.hasSessionContext())
{
external_tables = context.getSessionContext().getExternalTables();
if (!external_tables.empty())
database_column_mut->insertDefault(); /// Empty database for external tables.
}
block_to_filter.insert(ColumnWithTypeAndName(std::move(database_column_mut), std::make_shared<DataTypeString>(), "database"));
@ -297,29 +308,36 @@ Pipe StorageSystemColumns::read(
}
ColumnPtr & database_column = block_to_filter.getByName("database").column;
size_t rows = database_column->size();
/// Add `table` column.
MutableColumnPtr table_column_mut = ColumnString::create();
IColumn::Offsets offsets(rows);
for (size_t i = 0; i < rows; ++i)
IColumn::Offsets offsets(database_column->size());
for (size_t i = 0; i < database_column->size(); ++i)
{
const std::string database_name = (*database_column)[i].get<std::string>();
const DatabasePtr database = databases.at(database_name);
offsets[i] = i ? offsets[i - 1] : 0;
for (auto iterator = database->getTablesIterator(context); iterator->isValid(); iterator->next())
if (database_name.empty())
{
if (const auto & table = iterator->table())
for (auto & [table_name, table] : external_tables)
{
const String & table_name = iterator->name();
storages.emplace(std::piecewise_construct,
std::forward_as_tuple(database_name, table_name),
std::forward_as_tuple(table));
storages[{"", table_name}] = table;
table_column_mut->insert(table_name);
++offsets[i];
}
}
else
{
const DatabasePtr & database = databases.at(database_name);
for (auto iterator = database->getTablesIterator(context); iterator->isValid(); iterator->next())
{
if (const auto & table = iterator->table())
{
const String & table_name = iterator->name();
storages[{database_name, table_name}] = table;
table_column_mut->insert(table_name);
}
}
}
offsets[i] = table_column_mut->size();
}
database_column = database_column->replicate(offsets);

View File

@ -25,17 +25,20 @@ void StorageSystemDatabases::fillData(MutableColumns & res_columns, const Contex
const auto access = context.getAccess();
const bool check_access_for_databases = !access->isGranted(AccessType::SHOW_DATABASES);
auto databases = DatabaseCatalog::instance().getDatabases();
for (const auto & database : databases)
const auto databases = DatabaseCatalog::instance().getDatabases();
for (const auto & [database_name, database] : databases)
{
if (check_access_for_databases && !access->isGranted(AccessType::SHOW_DATABASES, database.first))
if (check_access_for_databases && !access->isGranted(AccessType::SHOW_DATABASES, database_name))
continue;
res_columns[0]->insert(database.first);
res_columns[1]->insert(database.second->getEngineName());
res_columns[2]->insert(context.getPath() + database.second->getDataPath());
res_columns[3]->insert(database.second->getMetadataPath());
res_columns[4]->insert(database.second->getUUID());
if (database_name == DatabaseCatalog::TEMPORARY_DATABASE)
continue; /// We don't want to show the internal database for temporary tables in system.databases
res_columns[0]->insert(database_name);
res_columns[1]->insert(database->getEngineName());
res_columns[2]->insert(context.getPath() + database->getDataPath());
res_columns[3]->insert(database->getMetadataPath());
res_columns[4]->insert(database->getUUID());
}
}

View File

@ -65,8 +65,15 @@ StorageSystemTables::StorageSystemTables(const StorageID & table_id_)
static ColumnPtr getFilteredDatabases(const ASTPtr & query, const Context & context)
{
MutableColumnPtr column = ColumnString::create();
for (const auto & db : DatabaseCatalog::instance().getDatabases())
column->insert(db.first);
const auto databases = DatabaseCatalog::instance().getDatabases();
for (const auto & database_name : databases | boost::adaptors::map_keys)
{
if (database_name == DatabaseCatalog::TEMPORARY_DATABASE)
continue; /// We don't want to show the internal database for temporary tables in system.tables
column->insert(database_name);
}
Block block { ColumnWithTypeAndName(std::move(column), std::make_shared<DataTypeString>(), "database") };
VirtualColumnUtils::filterBlockWithQuery(query, block, context);
@ -192,7 +199,11 @@ protected:
// create_table_query
if (columns_mask[src_index++])
res_columns[res_index++]->insertDefault();
{
auto temp_db = DatabaseCatalog::instance().getDatabaseForTemporaryTables();
ASTPtr ast = temp_db ? temp_db->tryGetCreateTableQuery(table.second->getStorageID().getTableName(), context) : nullptr;
res_columns[res_index++]->insert(ast ? queryToString(ast) : "");
}
// engine_full
if (columns_mask[src_index++])

View File

@ -2,7 +2,7 @@
#include "registerTableFunctions.h"
#if USE_HDFS
#include <Storages/StorageHDFS.h>
#include <Storages/HDFS/StorageHDFS.h>
#include <Storages/ColumnsDescription.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <TableFunctions/TableFunctionHDFS.h>

View File

@ -6,6 +6,7 @@
#include <TableFunctions/ITableFunction.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Interpreters/Context.h>
#include <Access/ContextAccess.h>
#include <TableFunctions/TableFunctionMerge.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <TableFunctions/registerTableFunctions.h>
@ -14,37 +15,24 @@
namespace DB
{
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int UNKNOWN_TABLE;
}
static NamesAndTypesList chooseColumns(const String & source_database, const String & table_name_regexp_, const Context & context)
namespace
{
OptimizedRegularExpression table_name_regexp(table_name_regexp_);
auto table_name_match = [&](const String & table_name) { return table_name_regexp.match(table_name); };
StoragePtr any_table;
[[noreturn]] void throwNoTablesMatchRegexp(const String & source_database, const String & source_table_regexp)
{
auto database = DatabaseCatalog::instance().getDatabase(source_database);
auto iterator = database->getTablesIterator(context, table_name_match);
if (iterator->isValid())
if (const auto & table = iterator->table())
any_table = table;
throw Exception(
"Error while executing table function merge. In database " + source_database
+ " no one matches regular expression: " + source_table_regexp,
ErrorCodes::UNKNOWN_TABLE);
}
if (!any_table)
throw Exception("Error while executing table function merge. In database " + source_database + " no one matches regular expression: "
+ table_name_regexp_, ErrorCodes::UNKNOWN_TABLE);
return any_table->getInMemoryMetadataPtr()->getColumns().getAllPhysical();
}
void TableFunctionMerge::parseArguments(const ASTPtr & ast_function, const Context & context)
{
ASTs & args_func = ast_function->children;
@ -65,21 +53,64 @@ void TableFunctionMerge::parseArguments(const ASTPtr & ast_function, const Conte
args[1] = evaluateConstantExpressionAsLiteral(args[1], context);
source_database = args[0]->as<ASTLiteral &>().value.safeGet<String>();
table_name_regexp = args[1]->as<ASTLiteral &>().value.safeGet<String>();
source_table_regexp = args[1]->as<ASTLiteral &>().value.safeGet<String>();
}
const Strings & TableFunctionMerge::getSourceTables(const Context & context) const
{
if (source_tables)
return *source_tables;
auto database = DatabaseCatalog::instance().getDatabase(source_database);
OptimizedRegularExpression re(source_table_regexp);
auto table_name_match = [&](const String & table_name_) { return re.match(table_name_); };
auto access = context.getAccess();
bool granted_show_on_all_tables = access->isGranted(AccessType::SHOW_TABLES, source_database);
bool granted_select_on_all_tables = access->isGranted(AccessType::SELECT, source_database);
source_tables.emplace();
for (auto it = database->getTablesIterator(context, table_name_match); it->isValid(); it->next())
{
if (!it->table())
continue;
bool granted_show = granted_show_on_all_tables || access->isGranted(AccessType::SHOW_TABLES, source_database, it->name());
if (!granted_show)
continue;
if (!granted_select_on_all_tables)
access->checkAccess(AccessType::SELECT, source_database, it->name());
source_tables->emplace_back(it->name());
}
if (source_tables->empty())
throwNoTablesMatchRegexp(source_database, source_table_regexp);
return *source_tables;
}
ColumnsDescription TableFunctionMerge::getActualTableStructure(const Context & context) const
{
return ColumnsDescription{chooseColumns(source_database, table_name_regexp, context)};
for (const auto & table_name : getSourceTables(context))
{
auto storage = DatabaseCatalog::instance().tryGetTable(StorageID{source_database, table_name}, context);
if (storage)
return ColumnsDescription{storage->getInMemoryMetadataPtr()->getColumns().getAllPhysical()};
}
throwNoTablesMatchRegexp(source_database, source_table_regexp);
}
StoragePtr TableFunctionMerge::executeImpl(const ASTPtr & /*ast_function*/, const Context & context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const
{
auto res = StorageMerge::create(
StorageID(getDatabaseName(), table_name),
getActualTableStructure(context),
source_database,
table_name_regexp,
getSourceTables(context),
context);
res->startup();

View File

@ -19,11 +19,13 @@ private:
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name, ColumnsDescription cached_columns) const override;
const char * getStorageTypeName() const override { return "Merge"; }
const Strings & getSourceTables(const Context & context) const;
ColumnsDescription getActualTableStructure(const Context & context) const override;
void parseArguments(const ASTPtr & ast_function, const Context & context) override;
String source_database;
String table_name_regexp;
String source_table_regexp;
mutable std::optional<Strings> source_tables;
};

View File

@ -175,6 +175,8 @@ def run_single_test(args, ext, server_logs_level, client_options, case_file, std
except OSError as e:
if e.errno != ESRCH:
raise
total_time = (datetime.now() - start_time).total_seconds()
return clickhouse_proc_create, "", "Timeout dropping database {} after test".format(database), total_time
total_time = (datetime.now() - start_time).total_seconds()

View File

@ -38,7 +38,8 @@ sudo -H pip install \
pytest-timeout \
redis \
tzlocal \
urllib3
urllib3 \
requests-kerberos
```
(highly not recommended) If you really want to use OS packages on modern debian/ubuntu instead of "pip": `sudo apt install -y docker docker-compose python3-pytest python3-dicttoxml python3-docker python3-pymysql python3-pymongo python3-tzlocal python3-kazoo python3-psycopg2 kafka-python python3-pytest-timeout python3-minio`

View File

@ -137,6 +137,7 @@ class ClickHouseCluster:
self.with_rabbitmq = False
self.with_odbc_drivers = False
self.with_hdfs = False
self.with_kerberized_hdfs = False
self.with_mongo = False
self.with_net_trics = False
self.with_redis = False
@ -172,7 +173,7 @@ class ClickHouseCluster:
macros=None,
with_zookeeper=False, with_mysql=False, with_kafka=False, with_kerberized_kafka=False, with_rabbitmq=False,
clickhouse_path_dir=None,
with_odbc_drivers=False, with_postgres=False, with_hdfs=False, with_mongo=False,
with_odbc_drivers=False, with_postgres=False, with_hdfs=False, with_kerberized_hdfs=False, with_mongo=False,
with_redis=False, with_minio=False, with_cassandra=False,
hostname=None, env_variables=None, image="yandex/clickhouse-integration-test", tag=None,
stay_alive=False, ipv4_address=None, ipv6_address=None, with_installed_binary=False, tmpfs=None,
@ -210,6 +211,7 @@ class ClickHouseCluster:
with_kafka=with_kafka,
with_kerberized_kafka=with_kerberized_kafka,
with_rabbitmq=with_rabbitmq,
with_kerberized_hdfs=with_kerberized_hdfs,
with_mongo=with_mongo,
with_redis=with_redis,
with_minio=with_minio,
@ -315,6 +317,14 @@ class ClickHouseCluster:
p.join(docker_compose_yml_dir, 'docker_compose_hdfs.yml')]
cmds.append(self.base_hdfs_cmd)
if with_kerberized_hdfs and not self.with_kerberized_hdfs:
self.with_kerberized_hdfs = True
self.base_cmd.extend(['--file', p.join(docker_compose_yml_dir, 'docker_compose_kerberized_hdfs.yml')])
self.base_kerberized_hdfs_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name',
self.project_name, '--file',
p.join(docker_compose_yml_dir, 'docker_compose_kerberized_hdfs.yml')]
cmds.append(self.base_kerberized_hdfs_cmd)
if with_mongo and not self.with_mongo:
self.with_mongo = True
self.base_cmd.extend(['--file', p.join(docker_compose_yml_dir, 'docker_compose_mongo.yml')])
@ -476,12 +486,35 @@ class ClickHouseCluster:
raise Exception("Cannot wait ZooKeeper container")
def make_hdfs_api(self, timeout=60, kerberized=False):
if kerberized:
keytab = p.abspath(p.join(self.instances['node1'].path, "secrets/clickhouse.keytab"))
krb_conf = p.abspath(p.join(self.instances['node1'].path, "secrets/krb_long.conf"))
hdfs_ip = self.get_instance_ip('kerberizedhdfs1')
# print("kerberizedhdfs1 ip ", hdfs_ip)
kdc_ip = self.get_instance_ip('hdfskerberos')
# print("kdc_ip ", kdc_ip)
self.hdfs_api = HDFSApi(user="root",
timeout=timeout,
kerberized=True,
principal="root@TEST.CLICKHOUSE.TECH",
keytab=keytab,
krb_conf=krb_conf,
host="kerberizedhdfs1",
protocol="http",
proxy_port=50070,
data_port=1006,
hdfs_ip=hdfs_ip,
kdc_ip=kdc_ip)
else:
self.hdfs_api = HDFSApi(user="root", host="hdfs1")
def wait_hdfs_to_start(self, timeout=60):
hdfs_api = HDFSApi("root")
start = time.time()
while time.time() - start < timeout:
try:
hdfs_api.write_data("/somefilewithrandomname222", "1")
self.hdfs_api.write_data("/somefilewithrandomname222", "1")
print("Connected to HDFS and SafeMode disabled! ")
return
except Exception as ex:
@ -620,6 +653,7 @@ class ClickHouseCluster:
self.wait_schema_registry_to_start(120)
if self.with_kerberized_kafka and self.base_kerberized_kafka_cmd:
print('Setup kerberized kafka')
env = os.environ.copy()
env['KERBERIZED_KAFKA_DIR'] = instance.path + '/'
subprocess.check_call(self.base_kerberized_kafka_cmd + common_opts + ['--renew-anon-volumes'], env=env)
@ -631,8 +665,17 @@ class ClickHouseCluster:
if self.with_hdfs and self.base_hdfs_cmd:
print('Setup HDFS')
subprocess_check_call(self.base_hdfs_cmd + common_opts)
self.make_hdfs_api()
self.wait_hdfs_to_start(120)
if self.with_kerberized_hdfs and self.base_kerberized_hdfs_cmd:
print('Setup kerberized HDFS')
env = os.environ.copy()
env['KERBERIZED_HDFS_DIR'] = instance.path + '/'
subprocess.check_call(self.base_kerberized_hdfs_cmd + common_opts, env=env)
self.make_hdfs_api(kerberized=True)
self.wait_hdfs_to_start(timeout=300)
if self.with_mongo and self.base_mongo_cmd:
print('Setup Mongo')
subprocess_check_call(self.base_mongo_cmd + common_opts)
@ -840,8 +883,8 @@ class ClickHouseInstance:
def __init__(
self, cluster, base_path, name, base_config_dir, custom_main_configs, custom_user_configs,
custom_dictionaries,
macros, with_zookeeper, zookeeper_config_path, with_mysql, with_kafka, with_kerberized_kafka, with_rabbitmq, with_mongo,
with_redis, with_minio,
macros, with_zookeeper, zookeeper_config_path, with_mysql, with_kafka, with_kerberized_kafka, with_rabbitmq, with_kerberized_hdfs,
with_mongo, with_redis, with_minio,
with_cassandra, server_bin_path, odbc_bridge_bin_path, clickhouse_path_dir, with_odbc_drivers,
hostname=None, env_variables=None,
image="yandex/clickhouse-integration-test", tag="latest",
@ -871,6 +914,7 @@ class ClickHouseInstance:
self.with_kafka = with_kafka
self.with_kerberized_kafka = with_kerberized_kafka
self.with_rabbitmq = with_rabbitmq
self.with_kerberized_hdfs = with_kerberized_hdfs
self.with_mongo = with_mongo
self.with_redis = with_redis
self.with_minio = with_minio
@ -885,7 +929,7 @@ class ClickHouseInstance:
else:
self.odbc_ini_path = ""
if with_kerberized_kafka:
if with_kerberized_kafka or with_kerberized_hdfs:
self.keytab_path = '- ' + os.path.dirname(self.docker_compose_path) + "/secrets:/tmp/keytab"
self.krb5_conf = '- ' + os.path.dirname(self.docker_compose_path) + "/secrets/krb.conf:/etc/krb5.conf:ro"
else:
@ -1231,7 +1275,7 @@ class ClickHouseInstance:
if self.with_zookeeper:
shutil.copy(self.zookeeper_config_path, conf_d_dir)
if self.with_kerberized_kafka:
if self.with_kerberized_kafka or self.with_kerberized_hdfs:
shutil.copytree(self.kerberos_secrets_dir, p.abspath(p.join(self.path, 'secrets')))
# Copy config.d configs
@ -1272,6 +1316,9 @@ class ClickHouseInstance:
if self.with_kerberized_kafka:
depends_on.append("kerberized_kafka1")
if self.with_kerberized_hdfs:
depends_on.append("kerberizedhdfs1")
if self.with_rabbitmq:
depends_on.append("rabbitmq1")

View File

@ -2,45 +2,145 @@
import io
import gzip
import subprocess
import time
from tempfile import NamedTemporaryFile
import requests
import requests_kerberos as reqkerb
import socket
import tempfile
import logging
import os
g_dns_hook = None
def custom_getaddrinfo(*args):
# print("from custom_getaddrinfo g_dns_hook is None ", g_dns_hook is None)
ret = g_dns_hook.custom_getaddrinfo(*args)
# print("g_dns_hook.custom_getaddrinfo result", ret)
return ret
class mk_krb_conf(object):
def __init__(self, krb_conf, kdc_ip):
self.krb_conf = krb_conf
self.kdc_ip = kdc_ip
self.amended_krb_conf = None
def __enter__(self):
with open(self.krb_conf) as f:
content = f.read()
amended_content = content.replace('hdfskerberos', self.kdc_ip)
self.amended_krb_conf = tempfile.NamedTemporaryFile(delete=False, mode="w+")
self.amended_krb_conf.write(amended_content)
self.amended_krb_conf.close()
return self.amended_krb_conf.name
def __exit__(self, type, value, traceback):
if self.amended_krb_conf is not None:
self.amended_krb_conf.close()
# tweak dns resolution to connect to localhost where api_host is in URL
class dns_hook(object):
def __init__(self, hdfs_api):
# print("dns_hook.init ", hdfs_api.kerberized, hdfs_api.host, hdfs_api.data_port, hdfs_api.proxy_port)
self.hdfs_api = hdfs_api
def __enter__(self):
global g_dns_hook
g_dns_hook = self
# print("g_dns_hook is None ", g_dns_hook is None)
self.original_getaddrinfo = socket.getaddrinfo
socket.getaddrinfo = custom_getaddrinfo
return self
def __exit__(self, type, value, traceback):
global g_dns_hook
g_dns_hook = None
socket.getaddrinfo = self.original_getaddrinfo
def custom_getaddrinfo(self, *args):
(hostname, port) = args[:2]
# print("top of custom_getaddrinfo", hostname, port)
if hostname == self.hdfs_api.host and (port == self.hdfs_api.data_port or port == self.hdfs_api.proxy_port):
# print("dns_hook substitute")
return [(socket.AF_INET, 1, 6, '', ("127.0.0.1", port))]
else:
return self.original_getaddrinfo(*args)
class HDFSApi(object):
def __init__(self, user):
self.host = "localhost"
self.http_proxy_port = "50070"
self.http_data_port = "50075"
def __init__(self, user, timeout=100, kerberized=False, principal=None,
keytab=None, krb_conf=None,
host = "localhost", protocol = "http",
proxy_port = 50070, data_port = 50075, hdfs_ip = None, kdc_ip = None):
self.host = host
self.protocol = protocol
self.proxy_port = proxy_port
self.data_port = data_port
self.user = user
self.kerberized = kerberized
self.principal = principal
self.keytab = keytab
self.timeout = timeout
self.hdfs_ip = hdfs_ip
self.kdc_ip = kdc_ip
self.krb_conf = krb_conf
# logging.basicConfig(level=logging.DEBUG)
# logging.getLogger().setLevel(logging.DEBUG)
# requests_log = logging.getLogger("requests.packages.urllib3")
# requests_log.setLevel(logging.DEBUG)
# requests_log.propagate = True
if kerberized:
self._run_kinit()
self.kerberos_auth = reqkerb.HTTPKerberosAuth(mutual_authentication=reqkerb.DISABLED, hostname_override=self.host, principal=self.principal)
#principal=self.principal,
#hostname_override=self.host, principal=self.principal)
# , mutual_authentication=reqkerb.REQUIRED, force_preemptive=True)
else:
self.kerberos_auth = None
def _run_kinit(self):
if self.principal is None or self.keytab is None:
raise Exception("kerberos principal and keytab are required")
with mk_krb_conf(self.krb_conf, self.kdc_ip) as instantiated_krb_conf:
# print("instantiated_krb_conf ", instantiated_krb_conf)
os.environ["KRB5_CONFIG"] = instantiated_krb_conf
cmd = "(kinit -R -t {keytab} -k {principal} || (sleep 5 && kinit -R -t {keytab} -k {principal})) ; klist".format(instantiated_krb_conf=instantiated_krb_conf, keytab=self.keytab, principal=self.principal)
# print(cmd)
start = time.time()
while time.time() - start < self.timeout:
try:
subprocess.call(cmd, shell=True)
print("KDC started, kinit successfully run")
return
except Exception as ex:
print("Can't run kinit ... waiting {}".format(str(ex)))
time.sleep(1)
raise Exception("Kinit running failure")
def read_data(self, path, universal_newlines=True):
response = requests.get(
"http://{host}:{port}/webhdfs/v1{path}?op=OPEN".format(host=self.host, port=self.http_proxy_port,
path=path), allow_redirects=False)
with dns_hook(self):
response = requests.get("{protocol}://{host}:{port}/webhdfs/v1{path}?op=OPEN".format(protocol=self.protocol, host=self.host, port=self.proxy_port, path=path), headers={'host': 'localhost'}, allow_redirects=False, verify=False, auth=self.kerberos_auth)
if response.status_code != 307:
response.raise_for_status()
additional_params = '&'.join(response.headers['Location'].split('&')[1:2])
response_data = requests.get(
"http://{host}:{port}/webhdfs/v1{path}?op=OPEN&{params}".format(host=self.host, port=self.http_data_port,
path=path, params=additional_params))
# additional_params = '&'.join(response.headers['Location'].split('&')[1:2])
url = "{location}".format(location=response.headers['Location'])
# print("redirected to ", url)
with dns_hook(self):
response_data = requests.get(url,
headers={'host': 'localhost'},
verify=False, auth=self.kerberos_auth)
if response_data.status_code != 200:
response_data.raise_for_status()
if universal_newlines:
return response_data.text
else:
return response_data.content
# Requests can't put file
def _curl_to_put(self, filename, path, params):
url = "http://{host}:{port}/webhdfs/v1{path}?op=CREATE&{params}".format(host=self.host,
port=self.http_data_port, path=path,
params=params)
cmd = "curl -s -i -X PUT -T {fname} '{url}'".format(fname=filename, url=url)
output = subprocess.check_output(cmd, shell=True, universal_newlines=True)
return output
def write_data(self, path, content):
named_file = NamedTemporaryFile(mode='wb+')
fpath = named_file.name
@ -48,19 +148,44 @@ class HDFSApi(object):
content = content.encode()
named_file.write(content)
named_file.flush()
response = requests.put(
"http://{host}:{port}/webhdfs/v1{path}?op=CREATE".format(host=self.host, port=self.http_proxy_port,
path=path, user=self.user),
allow_redirects=False
)
if self.kerberized:
self._run_kinit()
self.kerberos_auth = reqkerb.HTTPKerberosAuth(mutual_authentication=reqkerb.DISABLED, hostname_override=self.host, principal=self.principal)
# print(self.kerberos_auth)
with dns_hook(self):
response = requests.put(
"{protocol}://{host}:{port}/webhdfs/v1{path}?op=CREATE".format(protocol=self.protocol, host=self.host,
port=self.proxy_port,
path=path, user=self.user),
allow_redirects=False,
headers={'host': 'localhost'},
params={'overwrite' : 'true'},
verify=False, auth=self.kerberos_auth
)
if response.status_code != 307:
# print(response.headers)
response.raise_for_status()
additional_params = '&'.join(
response.headers['Location'].split('&')[1:2] + ["user.name={}".format(self.user), "overwrite=true"])
output = self._curl_to_put(fpath, path, additional_params)
if "201 Created" not in output:
raise Exception("Can't create file on hdfs:\n {}".format(output))
with dns_hook(self), open(fpath, mode="rb") as fh:
file_data = fh.read()
protocol = "http" # self.protocol
response = requests.put(
"{location}".format(location=response.headers['Location']),
data=file_data,
headers={'content-type':'text/plain', 'host': 'localhost'},
params={'file': path, 'user.name' : self.user},
allow_redirects=False, verify=False, auth=self.kerberos_auth
)
# print(response)
if response.status_code != 201:
response.raise_for_status()
def write_gzip_data(self, path, content):
if isinstance(content, str):

View File

@ -1,6 +1,5 @@
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.hdfs_api import HDFSApi
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', main_configs=['configs/config_with_hosts.xml'])
@ -101,9 +100,8 @@ def test_table_function_remote(start_cluster):
def test_redirect(start_cluster):
hdfs_api = HDFSApi("root")
hdfs_api.write_data("/simple_storage", "1\t\n")
assert hdfs_api.read_data("/simple_storage") == "1\t\n"
start_cluster.hdfs_api.write_data("/simple_storage", "1\t\n")
assert start_cluster.hdfs_api.read_data("/simple_storage") == "1\t\n"
node7.query(
"CREATE TABLE table_test_7_1 (word String) ENGINE=URL('http://hdfs1:50070/webhdfs/v1/simple_storage?op=OPEN&namenoderpcaddress=hdfs1:9000&offset=0', CSV)")
assert "not allowed" in node7.query_and_get_error("SET max_http_get_redirects=1; SELECT * from table_test_7_1")

View File

@ -17,9 +17,8 @@ def started_cluster():
def test_url_without_redirect(started_cluster):
hdfs_api = HDFSApi("root")
hdfs_api.write_data("/simple_storage", "1\tMark\t72.53\n")
assert hdfs_api.read_data("/simple_storage") == "1\tMark\t72.53\n"
started_cluster.hdfs_api.write_data("/simple_storage", "1\tMark\t72.53\n")
assert started_cluster.hdfs_api.read_data("/simple_storage") == "1\tMark\t72.53\n"
# access datanode port directly
node1.query(
@ -28,9 +27,8 @@ def test_url_without_redirect(started_cluster):
def test_url_with_redirect_not_allowed(started_cluster):
hdfs_api = HDFSApi("root")
hdfs_api.write_data("/simple_storage", "1\tMark\t72.53\n")
assert hdfs_api.read_data("/simple_storage") == "1\tMark\t72.53\n"
started_cluster.hdfs_api.write_data("/simple_storage", "1\tMark\t72.53\n")
assert started_cluster.hdfs_api.read_data("/simple_storage") == "1\tMark\t72.53\n"
# access proxy port without allowing redirects
node1.query(
@ -40,9 +38,8 @@ def test_url_with_redirect_not_allowed(started_cluster):
def test_url_with_redirect_allowed(started_cluster):
hdfs_api = HDFSApi("root")
hdfs_api.write_data("/simple_storage", "1\tMark\t72.53\n")
assert hdfs_api.read_data("/simple_storage") == "1\tMark\t72.53\n"
started_cluster.hdfs_api.write_data("/simple_storage", "1\tMark\t72.53\n")
assert started_cluster.hdfs_api.read_data("/simple_storage") == "1\tMark\t72.53\n"
# access proxy port with allowing redirects
# http://localhost:50070/webhdfs/v1/b?op=OPEN&namenoderpcaddress=hdfs1:9000&offset=0

View File

@ -24,18 +24,14 @@ def started_cluster():
def test_read_write_storage(started_cluster):
hdfs_api = HDFSApi("root")
node1.query(
"create table SimpleHDFSStorage (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://hdfs1:9000/simple_storage', 'TSV')")
node1.query("insert into SimpleHDFSStorage values (1, 'Mark', 72.53)")
assert hdfs_api.read_data("/simple_storage") == "1\tMark\t72.53\n"
assert started_cluster.hdfs_api.read_data("/simple_storage") == "1\tMark\t72.53\n"
assert node1.query("select * from SimpleHDFSStorage") == "1\tMark\t72.53\n"
def test_read_write_storage_with_globs(started_cluster):
hdfs_api = HDFSApi("root")
node1.query(
"create table HDFSStorageWithRange (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://hdfs1:9000/storage{1..5}', 'TSV')")
node1.query(
@ -46,8 +42,8 @@ def test_read_write_storage_with_globs(started_cluster):
"create table HDFSStorageWithAsterisk (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://hdfs1:9000/storage*', 'TSV')")
for i in ["1", "2", "3"]:
hdfs_api.write_data("/storage" + i, i + "\tMark\t72.53\n")
assert hdfs_api.read_data("/storage" + i) == i + "\tMark\t72.53\n"
started_cluster.hdfs_api.write_data("/storage" + i, i + "\tMark\t72.53\n")
assert started_cluster.hdfs_api.read_data("/storage" + i) == i + "\tMark\t72.53\n"
assert node1.query("select count(*) from HDFSStorageWithRange") == "3\n"
assert node1.query("select count(*) from HDFSStorageWithEnum") == "3\n"
@ -77,25 +73,23 @@ def test_read_write_storage_with_globs(started_cluster):
def test_read_write_table(started_cluster):
hdfs_api = HDFSApi("root")
data = "1\tSerialize\t555.222\n2\tData\t777.333\n"
hdfs_api.write_data("/simple_table_function", data)
started_cluster.hdfs_api.write_data("/simple_table_function", data)
assert hdfs_api.read_data("/simple_table_function") == data
assert started_cluster.hdfs_api.read_data("/simple_table_function") == data
assert node1.query(
"select * from hdfs('hdfs://hdfs1:9000/simple_table_function', 'TSV', 'id UInt64, text String, number Float64')") == data
def test_write_table(started_cluster):
hdfs_api = HDFSApi("root")
node1.query(
"create table OtherHDFSStorage (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://hdfs1:9000/other_storage', 'TSV')")
node1.query("insert into OtherHDFSStorage values (10, 'tomas', 55.55), (11, 'jack', 32.54)")
result = "10\ttomas\t55.55\n11\tjack\t32.54\n"
assert hdfs_api.read_data("/other_storage") == result
assert started_cluster.hdfs_api.read_data("/other_storage") == result
assert node1.query("select * from OtherHDFSStorage order by id") == result
@ -120,15 +114,14 @@ def test_bad_hdfs_uri(started_cluster):
print(ex)
assert "Unable to open HDFS file" in str(ex)
@pytest.mark.timeout(800)
def test_globs_in_read_table(started_cluster):
hdfs_api = HDFSApi("root")
some_data = "1\tSerialize\t555.222\n2\tData\t777.333\n"
globs_dir = "/dir_for_test_with_globs/"
files = ["dir1/dir_dir/file1", "dir2/file2", "simple_table_function", "dir/file", "some_dir/dir1/file",
"some_dir/dir2/file", "some_dir/file", "table1_function", "table2_function", "table3_function"]
for filename in files:
hdfs_api.write_data(globs_dir + filename, some_data)
started_cluster.hdfs_api.write_data(globs_dir + filename, some_data)
test_requests = [("dir{1..5}/dir_dir/file1", 1, 1),
("*_table_functio?", 1, 1),
@ -145,6 +138,7 @@ def test_globs_in_read_table(started_cluster):
for pattern, paths_amount, files_amount in test_requests:
inside_table_func = "'hdfs://hdfs1:9000" + globs_dir + pattern + "', 'TSV', 'id UInt64, text String, number Float64'"
print("inside_table_func ", inside_table_func)
assert node1.query("select * from hdfs(" + inside_table_func + ")") == paths_amount * some_data
assert node1.query("select count(distinct _path) from hdfs(" + inside_table_func + ")").rstrip() == str(
paths_amount)
@ -153,64 +147,61 @@ def test_globs_in_read_table(started_cluster):
def test_read_write_gzip_table(started_cluster):
hdfs_api = HDFSApi("root")
data = "1\tHello Jessica\t555.222\n2\tI rolled a joint\t777.333\n"
hdfs_api.write_gzip_data("/simple_table_function.gz", data)
started_cluster.hdfs_api.write_gzip_data("/simple_table_function.gz", data)
assert hdfs_api.read_gzip_data("/simple_table_function.gz") == data
assert started_cluster.hdfs_api.read_gzip_data("/simple_table_function.gz") == data
assert node1.query(
"select * from hdfs('hdfs://hdfs1:9000/simple_table_function.gz', 'TSV', 'id UInt64, text String, number Float64')") == data
def test_read_write_gzip_table_with_parameter_gzip(started_cluster):
hdfs_api = HDFSApi("root")
data = "1\tHello Jessica\t555.222\n2\tI rolled a joint\t777.333\n"
hdfs_api.write_gzip_data("/simple_table_function", data)
started_cluster.hdfs_api.write_gzip_data("/simple_table_function", data)
assert hdfs_api.read_gzip_data("/simple_table_function") == data
assert started_cluster.hdfs_api.read_gzip_data("/simple_table_function") == data
assert node1.query(
"select * from hdfs('hdfs://hdfs1:9000/simple_table_function', 'TSV', 'id UInt64, text String, number Float64', 'gzip')") == data
def test_read_write_table_with_parameter_none(started_cluster):
hdfs_api = HDFSApi("root")
data = "1\tHello Jessica\t555.222\n2\tI rolled a joint\t777.333\n"
hdfs_api.write_data("/simple_table_function.gz", data)
started_cluster.hdfs_api.write_data("/simple_table_function.gz", data)
assert hdfs_api.read_data("/simple_table_function.gz") == data
assert started_cluster.hdfs_api.read_data("/simple_table_function.gz") == data
assert node1.query(
"select * from hdfs('hdfs://hdfs1:9000/simple_table_function.gz', 'TSV', 'id UInt64, text String, number Float64', 'none')") == data
def test_read_write_gzip_table_with_parameter_auto_gz(started_cluster):
hdfs_api = HDFSApi("root")
data = "1\tHello Jessica\t555.222\n2\tI rolled a joint\t777.333\n"
hdfs_api.write_gzip_data("/simple_table_function.gz", data)
started_cluster.hdfs_api.write_gzip_data("/simple_table_function.gz", data)
assert hdfs_api.read_gzip_data("/simple_table_function.gz") == data
assert started_cluster.hdfs_api.read_gzip_data("/simple_table_function.gz") == data
assert node1.query(
"select * from hdfs('hdfs://hdfs1:9000/simple_table_function.gz', 'TSV', 'id UInt64, text String, number Float64', 'auto')") == data
def test_write_gz_storage(started_cluster):
hdfs_api = HDFSApi("root")
node1.query(
"create table GZHDFSStorage (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://hdfs1:9000/storage.gz', 'TSV')")
node1.query("insert into GZHDFSStorage values (1, 'Mark', 72.53)")
assert hdfs_api.read_gzip_data("/storage.gz") == "1\tMark\t72.53\n"
assert started_cluster.hdfs_api.read_gzip_data("/storage.gz") == "1\tMark\t72.53\n"
assert node1.query("select * from GZHDFSStorage") == "1\tMark\t72.53\n"
def test_write_gzip_storage(started_cluster):
hdfs_api = HDFSApi("root")
node1.query(
"create table GZIPHDFSStorage (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://hdfs1:9000/gzip_storage', 'TSV', 'gzip')")
node1.query("insert into GZIPHDFSStorage values (1, 'Mark', 72.53)")
assert hdfs_api.read_gzip_data("/gzip_storage") == "1\tMark\t72.53\n"
assert started_cluster.hdfs_api.read_gzip_data("/gzip_storage") == "1\tMark\t72.53\n"
assert node1.query("select * from GZIPHDFSStorage") == "1\tMark\t72.53\n"
if __name__ == '__main__':
cluster.start()
input("Cluster created, press any key to destroy...")
cluster.shutdown()

View File

@ -0,0 +1,13 @@
<yandex>
<hdfs>
<hadoop_kerberos_keytab>/tmp/keytab/clickhouse.keytab</hadoop_kerberos_keytab>
<hadoop_kerberos_principal>root@TEST.CLICKHOUSE.TECH</hadoop_kerberos_principal>
<hadoop_security_authentication>kerberos</hadoop_security_authentication>
</hdfs>
<hdfs_suser>
<hadoop_kerberos_principal>specuser@TEST.CLICKHOUSE.TECH</hadoop_kerberos_principal>
</hdfs_suser>
<hdfs_dedicatedcachepath>
<hadoop_security_kerberos_ticket_cache_path>/tmp/kerb_cache</hadoop_security_kerberos_ticket_cache_path>
</hdfs_dedicatedcachepath>
</yandex>

View File

@ -0,0 +1,11 @@
<yandex>
<logger>
<level>trace</level>
<log>/var/log/clickhouse-server/log.log</log>
<errorlog>/var/log/clickhouse-server/log.err.log</errorlog>
<size>1000M</size>
<count>10</count>
<stderr>/var/log/clickhouse-server/stderr.log</stderr>
<stdout>/var/log/clickhouse-server/stdout.log</stdout>
</logger>
</yandex>

View File

@ -0,0 +1,262 @@
#!/bin/bash
: "${HADOOP_PREFIX:=/usr/local/hadoop}"
cat >> $HADOOP_PREFIX/etc/hadoop/hadoop-env.sh <<EOF
export HADOOP_SECURE_DN_USER=hdfs
export HADOOP_SECURE_DN_PID_DIR=$HADOOP_PREFIX/pid
export HADOOP_SECURE_DN_LOG_DIR=$HADOOP_PREFIX/logs/hdfs
export JSVC_HOME=$HADOOP_PREFIX/sbin
EOF
$HADOOP_PREFIX/etc/hadoop/hadoop-env.sh
mkdir -p "${HADOOP_SECURE_DN_PID_DIR}"
mkdir -p "${HADOOP_SECURE_DN_LOG_DIR}"
rm /tmp/*.pid
# installing libraries if any - (resource urls added comma separated to the ACP system variable)
cd "${HADOOP_PREFIX}/share/hadoop/common" || exit
for cp in ${ACP//,/ }; do echo "== ${cp}"; curl -LO "${cp}" ; done;
cd - || exit
# altering the core-site configuration
sed "s/HOSTNAME/${HOSTNAME}/" /usr/local/hadoop/etc/hadoop/core-site.xml.template | grep -v '/configuration' > /usr/local/hadoop/etc/hadoop/core-site.xml
cat >> /usr/local/hadoop/etc/hadoop/core-site.xml << EOF
<property>
<name>hadoop.security.authentication</name>
<value>kerberos</value> <!-- A value of "simple" would disable security. -->
</property>
<property>
<name>hadoop.security.authorization</name>
<value>true</value>
</property>
<property>
<name>fs.defaultFS</name>
<value>hdfs://kerberizedhdfs1:9000</value>
</property>
<property>
<name>fs.default.name</name>
<value>hdfs://kerberizedhdfs1:9000</value>
</property>
<!--
<property>
<name>hadoop.rpc.protection</name>
<value>privacy</value>
</property>
-->
</configuration>
EOF
cat > /usr/local/hadoop/etc/hadoop/hdfs-site.xml << EOF
<configuration>
<!--
<property>
<name>dfs.permissions.enabled</name>
<value>false</value>
</property>
-->
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
<!-- General HDFS security config -->
<property>
<name>dfs.block.access.token.enable</name>
<value>true</value>
</property>
<!-- NameNode security config -->
<property>
<name>dfs.namenode.keytab.file</name>
<value>/usr/local/hadoop/etc/hadoop/conf/hdfs.keytab</value> <!-- path to the HDFS keytab -->
</property>
<property>
<name>dfs.namenode.kerberos.principal</name>
<value>hdfs/_HOST@TEST.CLICKHOUSE.TECH</value>
</property>
<property>
<name>dfs.namenode.kerberos.internal.spnego.principal</name>
<value>HTTP/_HOST@TEST.CLICKHOUSE.TECH</value>
</property>
<!-- Secondary NameNode security config -->
<property>
<name>dfs.secondary.namenode.keytab.file</name>
<value>/usr/local/hadoop/etc/hadoop/conf/hdfs.keytab</value> <!-- path to the HDFS keytab -->
</property>
<property>
<name>dfs.secondary.namenode.kerberos.principal</name>
<value>hdfs/_HOST@TEST.CLICKHOUSE.TECH</value>
</property>
<property>
<name>dfs.secondary.namenode.kerberos.internal.spnego.principal</name>
<value>HTTP/_HOST@TEST.CLICKHOUSE.TECH</value>
</property>
<!-- DataNode security config
<property>
<name>dfs.data.transfer.protectionл</name>
<value>integrity</value>
</property>
-->
<property>
<name>dfs.datanode.data.dir.perm</name>
<value>700</value>
</property>
<property>
<name>dfs.datanode.address</name>
<value>0.0.0.0:1004</value>
</property>
<property>
<name>dfs.datanode.http.address</name>
<value>0.0.0.0:1006</value>
</property>
<!--
<property>
<name>dfs.http.policy</name>
<value>HTTPS_ONLY</value>
</property>
-->
<property>
<name>dfs.datanode.keytab.file</name>
<value>/usr/local/hadoop/etc/hadoop/conf/hdfs.keytab</value> <!-- path to the HDFS keytab -->
</property>
<property>
<name>dfs.datanode.kerberos.principal</name>
<value>hdfs/_HOST@TEST.CLICKHOUSE.TECH</value>
</property>
<!-- Web Authentication config -->
<property>
<name>dfs.webhdfs.enabled</name>
<value>true</value>
</property>
<property>
<name>dfs.encrypt.data.transfer</name>
<value>false</value>
</property>
<property>
<name>dfs.web.authentication.kerberos.principal</name>
<value>HTTP/_HOST@TEST.CLICKHOUSE.TECH</value>
</property>
<property>
<name>dfs.web.authentication.kerberos.keytab</name>
<value>/usr/local/hadoop/etc/hadoop/conf/hdfs.keytab</value> <!-- path to the HDFS keytab -->
</property>
</configuration>
EOF
# cat > /usr/local/hadoop/etc/hadoop/ssl-server.xml << EOF
# <configuration>
# <property>
# <name>ssl.server.truststore.location</name>
# <value>/usr/local/hadoop/etc/hadoop/conf/hdfs.jks</value>
# </property>
# <property>
# <name>ssl.server.truststore.password</name>
# <value>masterkey</value>
# </property>
# <property>
# <name>ssl.server.keystore.location</name>
# <value>/usr/local/hadoop/etc/hadoop/conf/hdfs.jks</value>
# </property>
# <property>
# <name>ssl.server.keystore.password</name>
# <value>masterkey</value>
# </property>
# <property>
# <name>ssl.server.keystore.keypassword</name>
# <value>masterkey</value>
# </property>
# </configuration>
# EOF
cat > /usr/local/hadoop/etc/hadoop/log4j.properties << EOF
# Set everything to be logged to the console
log4j.rootCategory=DEBUG, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
# Set the default spark-shell log level to WARN. When running the spark-shell, the
# log level for this class is used to overwrite the root logger's log level, so that
# the user can have different defaults for the shell and regular Spark apps.
log4j.logger.org.apache.spark.repl.Main=INFO
# Settings to quiet third party logs that are too verbose
log4j.logger.org.spark_project.jetty=DEBUG
log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
log4j.logger.org.apache.parquet=ERROR
log4j.logger.parquet=ERROR
# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR
log4j.logger.org.apache.spark.deploy=DEBUG
log4j.logger.org.apache.spark.executor=DEBUG
log4j.logger.org.apache.spark.scheduler=DEBUG
EOF
useradd -u 1098 hdfs
# keytool -genkey -alias kerberized_hdfs1.test.clickhouse.tech -keyalg rsa -keysize 1024 -dname "CN=kerberized_hdfs1.test.clickhouse.tech" -keypass masterkey -keystore /usr/local/hadoop/etc/hadoop/conf/hdfs.jks -storepass masterkey
keytool -genkey -alias kerberizedhdfs1 -keyalg rsa -keysize 1024 -dname "CN=kerberizedhdfs1" -keypass masterkey -keystore /usr/local/hadoop/etc/hadoop/conf/hdfs.jks -storepass masterkey
chmod g+r /usr/local/hadoop/etc/hadoop/conf/hdfs.jks
service sshd start
# yum --quiet --assumeyes install krb5-workstation.x86_64
# yum --quiet --assumeyes install tcpdump
# cd /tmp
# curl http://archive.apache.org/dist/commons/daemon/source/commons-daemon-1.0.15-src.tar.gz -o commons-daemon-1.0.15-src.tar.gz
# tar xzf commons-daemon-1.0.15-src.tar.gz
# cd commons-daemon-1.0.15-src/src/native/unix
# ./configure && make
# cp ./jsvc /usr/local/hadoop/sbin
until kinit -kt /usr/local/hadoop/etc/hadoop/conf/hdfs.keytab hdfs/kerberizedhdfs1@TEST.CLICKHOUSE.TECH; do sleep 2; done
echo "KDC is up and ready to go... starting up"
$HADOOP_PREFIX/sbin/start-dfs.sh
$HADOOP_PREFIX/sbin/start-yarn.sh
$HADOOP_PREFIX/sbin/mr-jobhistory-daemon.sh start historyserver
chmod a+r /usr/local/hadoop/etc/hadoop/conf/hdfs.keytab # create dedicated keytab for hdfsuser
$HADOOP_PREFIX/sbin/start-secure-dns.sh
sleep 3
/usr/local/hadoop/bin/hdfs dfsadmin -safemode leave
/usr/local/hadoop/bin/hdfs dfs -mkdir /user/specuser
/usr/local/hadoop/bin/hdfs dfs -chown specuser /user/specuser
kdestroy
# adduser --groups hdfs hdfsuser
# /usr/local/hadoop/sbin/hadoop-daemon.sh --config /usr/local/hadoop/etc/hadoop/ --script /usr/local/hadoop/sbin/hdfs start namenode
# /usr/local/hadoop/sbin/hadoop-daemon.sh --config /usr/local/hadoop/etc/hadoop/ --script /usr/local/hadoop/sbin/hdfs start datanode
if [[ $1 == "-d" ]]; then
while true; do sleep 1000; done
fi
if [[ $1 == "-bash" ]]; then
/bin/bash
fi

View File

@ -0,0 +1,139 @@
#!/bin/bash
set -x # trace
: "${REALM:=TEST.CLICKHOUSE.TECH}"
: "${DOMAIN_REALM:=test.clickhouse.tech}"
: "${KERB_MASTER_KEY:=masterkey}"
: "${KERB_ADMIN_USER:=admin}"
: "${KERB_ADMIN_PASS:=admin}"
create_config() {
: "${KDC_ADDRESS:=$(hostname -f)}"
cat>/etc/krb5.conf<<EOF
[logging]
default = FILE:/var/log/kerberos/krb5libs.log
kdc = FILE:/var/log/kerberos/krb5kdc.log
admin_server = FILE:/var/log/kerberos/kadmind.log
[libdefaults]
default_realm = $REALM
dns_lookup_realm = false
dns_lookup_kdc = false
ticket_lifetime = 15d
renew_lifetime = 15d
forwardable = true
# WARNING: We use weaker key types to simplify testing as stronger key types
# require the enhanced security JCE policy file to be installed. You should
# NOT run with this configuration in production or any real environment. You
# have been warned.
default_tkt_enctypes = des-cbc-md5 des-cbc-crc des3-cbc-sha1
default_tgs_enctypes = des-cbc-md5 des-cbc-crc des3-cbc-sha1
permitted_enctypes = des-cbc-md5 des-cbc-crc des3-cbc-sha1
[realms]
$REALM = {
kdc = $KDC_ADDRESS
admin_server = $KDC_ADDRESS
}
[domain_realm]
.$DOMAIN_REALM = $REALM
$DOMAIN_REALM = $REALM
EOF
cat>/var/kerberos/krb5kdc/kdc.conf<<EOF
[kdcdefaults]
kdc_ports = 88
kdc_tcp_ports = 88
[realms]
$REALM = {
acl_file = /var/kerberos/krb5kdc/kadm5.acl
dict_file = /usr/share/dict/words
admin_keytab = /var/kerberos/krb5kdc/kadm5.keytab
# WARNING: We use weaker key types to simplify testing as stronger key types
# require the enhanced security JCE policy file to be installed. You should
# NOT run with this configuration in production or any real environment. You
# have been warned.
master_key_type = des3-hmac-sha1
supported_enctypes = arcfour-hmac:normal des3-hmac-sha1:normal des-cbc-crc:normal des:normal des:v4 des:norealm des:onlyrealm des:afs3
default_principal_flags = +preauth
}
EOF
}
create_db() {
/usr/sbin/kdb5_util -P $KERB_MASTER_KEY -r $REALM create -s
}
start_kdc() {
mkdir -p /var/log/kerberos
/etc/rc.d/init.d/krb5kdc start
/etc/rc.d/init.d/kadmin start
chkconfig krb5kdc on
chkconfig kadmin on
}
restart_kdc() {
/etc/rc.d/init.d/krb5kdc restart
/etc/rc.d/init.d/kadmin restart
}
create_admin_user() {
kadmin.local -q "addprinc -pw $KERB_ADMIN_PASS $KERB_ADMIN_USER/admin"
echo "*/admin@$REALM *" > /var/kerberos/krb5kdc/kadm5.acl
}
create_keytabs() {
# kadmin.local -q "addprinc -randkey hdfs/kerberizedhdfs1.${DOMAIN_REALM}@${REALM}"
# kadmin.local -q "ktadd -norandkey -k /tmp/keytab/hdfs.keytab hdfs/kerberizedhdfs1.${DOMAIN_REALM}@${REALM}"
# kadmin.local -q "addprinc -randkey HTTP/kerberizedhdfs1.${DOMAIN_REALM}@${REALM}"
# kadmin.local -q "ktadd -norandkey -k /tmp/keytab/hdfs.keytab HTTP/kerberizedhdfs1.${DOMAIN_REALM}@${REALM}"
kadmin.local -q "addprinc -randkey hdfs/kerberizedhdfs1@${REALM}"
kadmin.local -q "ktadd -norandkey -k /tmp/keytab/hdfs.keytab hdfs/kerberizedhdfs1@${REALM}"
kadmin.local -q "addprinc -randkey HTTP/kerberizedhdfs1@${REALM}"
kadmin.local -q "ktadd -norandkey -k /tmp/keytab/hdfs.keytab HTTP/kerberizedhdfs1@${REALM}"
kadmin.local -q "addprinc -randkey hdfsuser/node1@${REALM}"
kadmin.local -q "ktadd -norandkey -k /tmp/keytab/clickhouse.keytab hdfsuser/node1@${REALM}"
kadmin.local -q "addprinc -randkey hdfsuser@${REALM}"
kadmin.local -q "ktadd -norandkey -k /tmp/keytab/clickhouse.keytab hdfsuser@${REALM}"
kadmin.local -q "addprinc -randkey root@${REALM}"
kadmin.local -q "ktadd -norandkey -k /tmp/keytab/clickhouse.keytab root@${REALM}"
kadmin.local -q "addprinc -randkey specuser@${REALM}"
kadmin.local -q "ktadd -norandkey -k /tmp/keytab/clickhouse.keytab specuser@${REALM}"
chmod g+r /tmp/keytab/clickhouse.keytab
}
main() {
if [ ! -f /kerberos_initialized ]; then
create_config
create_db
create_admin_user
start_kdc
touch /kerberos_initialized
fi
if [ ! -f /var/kerberos/krb5kdc/principal ]; then
while true; do sleep 1000; done
else
start_kdc
create_keytabs
tail -F /var/log/kerberos/krb5kdc.log
fi
}
[[ "$0" == "${BASH_SOURCE[0]}" ]] && main "$@"

View File

@ -0,0 +1,24 @@
[logging]
default = FILE:/var/log/krb5libs.log
kdc = FILE:/var/log/krb5kdc.log
admin_server = FILE:/var/log/kadmind.log
[libdefaults]
default_realm = TEST.CLICKHOUSE.TECH
dns_lookup_realm = false
dns_lookup_kdc = false
ticket_lifetime = 15s
forwardable = true
default_tgs_enctypes = des3-hmac-sha1
default_tkt_enctypes = des3-hmac-sha1
permitted_enctypes = des3-hmac-sha1
[realms]
TEST.CLICKHOUSE.TECH = {
kdc = hdfskerberos
admin_server = hdfskerberos
}
[domain_realm]
.test.clickhouse.tech = TEST.CLICKHOUSE.TECH
test.clickhouse.tech = TEST.CLICKHOUSE.TECH

View File

@ -0,0 +1,24 @@
[logging]
default = FILE:/var/log/krb5libs.log
kdc = FILE:/var/log/krb5kdc.log
admin_server = FILE:/var/log/kadmind.log
[libdefaults]
default_realm = TEST.CLICKHOUSE.TECH
dns_lookup_realm = false
dns_lookup_kdc = false
ticket_lifetime = 15d
forwardable = true
default_tgs_enctypes = des3-hmac-sha1
default_tkt_enctypes = des3-hmac-sha1
permitted_enctypes = des3-hmac-sha1
[realms]
TEST.CLICKHOUSE.TECH = {
kdc = hdfskerberos
admin_server = hdfskerberos
}
[domain_realm]
.test.clickhouse.tech = TEST.CLICKHOUSE.TECH
test.clickhouse.tech = TEST.CLICKHOUSE.TECH

View File

@ -0,0 +1,108 @@
import time
import pytest
import os
from helpers.cluster import ClickHouseCluster
import subprocess
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', with_kerberized_hdfs=True, user_configs=[], main_configs=['configs/log_conf.xml', 'configs/hdfs.xml'])
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
except Exception as ex:
print(ex)
raise ex
finally:
cluster.shutdown()
def test_read_table(started_cluster):
data = "1\tSerialize\t555.222\n2\tData\t777.333\n"
started_cluster.hdfs_api.write_data("/simple_table_function", data)
api_read = started_cluster.hdfs_api.read_data("/simple_table_function")
assert api_read == data
select_read = node1.query("select * from hdfs('hdfs://kerberizedhdfs1:9000/simple_table_function', 'TSV', 'id UInt64, text String, number Float64')")
assert select_read == data
def test_read_write_storage(started_cluster):
node1.query("create table SimpleHDFSStorage2 (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://kerberizedhdfs1:9000/simple_storage1', 'TSV')")
node1.query("insert into SimpleHDFSStorage2 values (1, 'Mark', 72.53)")
api_read = started_cluster.hdfs_api.read_data("/simple_storage1")
assert api_read == "1\tMark\t72.53\n"
select_read = node1.query("select * from SimpleHDFSStorage2")
assert select_read == "1\tMark\t72.53\n"
def test_write_storage_not_expired(started_cluster):
node1.query("create table SimpleHDFSStorageNotExpired (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://kerberizedhdfs1:9000/simple_storage_not_expired', 'TSV')")
time.sleep(45) # wait for ticket expiration
node1.query("insert into SimpleHDFSStorageNotExpired values (1, 'Mark', 72.53)")
api_read = started_cluster.hdfs_api.read_data("/simple_storage_not_expired")
assert api_read == "1\tMark\t72.53\n"
select_read = node1.query("select * from SimpleHDFSStorageNotExpired")
assert select_read == "1\tMark\t72.53\n"
def test_two_users(started_cluster):
node1.query("create table HDFSStorOne (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://kerberizedhdfs1:9000/storage_user_one', 'TSV')")
node1.query("insert into HDFSStorOne values (1, 'Real', 86.00)")
node1.query("create table HDFSStorTwo (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://suser@kerberizedhdfs1:9000/user/specuser/storage_user_two', 'TSV')")
node1.query("insert into HDFSStorTwo values (1, 'Ideal', 74.00)")
select_read_1 = node1.query("select * from hdfs('hdfs://kerberizedhdfs1:9000/user/specuser/storage_user_two', 'TSV', 'id UInt64, text String, number Float64')")
select_read_2 = node1.query("select * from hdfs('hdfs://suser@kerberizedhdfs1:9000/storage_user_one', 'TSV', 'id UInt64, text String, number Float64')")
def test_read_table_expired(started_cluster):
data = "1\tSerialize\t555.222\n2\tData\t777.333\n"
started_cluster.hdfs_api.write_data("/simple_table_function_relogin", data)
started_cluster.pause_container('hdfskerberos')
time.sleep(45)
try:
select_read = node1.query("select * from hdfs('hdfs://reloginuser&kerberizedhdfs1:9000/simple_table_function', 'TSV', 'id UInt64, text String, number Float64')")
assert False, "Exception have to be thrown"
except Exception as ex:
assert "DB::Exception: kinit failure:" in str(ex)
started_cluster.unpause_container('hdfskerberos')
def test_prohibited(started_cluster):
node1.query("create table HDFSStorTwoProhibited (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://suser@kerberizedhdfs1:9000/storage_user_two_prohibited', 'TSV')")
try:
node1.query("insert into HDFSStorTwoProhibited values (1, 'SomeOne', 74.00)")
assert False, "Exception have to be thrown"
except Exception as ex:
assert "Unable to open HDFS file: /storage_user_two_prohibited error: Permission denied: user=specuser, access=WRITE" in str(ex)
def test_cache_path(started_cluster):
node1.query("create table HDFSStorCachePath (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://dedicatedcachepath@kerberizedhdfs1:9000/storage_dedicated_cache_path', 'TSV')")
try:
node1.query("insert into HDFSStorCachePath values (1, 'FatMark', 92.53)")
assert False, "Exception have to be thrown"
except Exception as ex:
assert "DB::Exception: hadoop.security.kerberos.ticket.cache.path cannot be set per user" in str(ex)
if __name__ == '__main__':
cluster.start()
input("Cluster created, press any key to destroy...")
cluster.shutdown()

View File

@ -0,0 +1,55 @@
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import TSV
cluster = ClickHouseCluster(__file__)
instance = cluster.add_instance('instance')
@pytest.fixture(scope="module", autouse=True)
def started_cluster():
try:
cluster.start()
instance.query("CREATE TABLE table1(x UInt32) ENGINE = MergeTree ORDER BY tuple()")
instance.query("CREATE TABLE table2(x UInt32) ENGINE = MergeTree ORDER BY tuple()")
instance.query("INSERT INTO table1 VALUES (1)")
instance.query("INSERT INTO table2 VALUES (2)")
yield cluster
finally:
cluster.shutdown()
@pytest.fixture(autouse=True)
def cleanup_after_test():
try:
yield
finally:
instance.query("DROP USER IF EXISTS A")
def test_merge():
select_query = "SELECT * FROM merge('default', 'table[0-9]+') ORDER BY x"
assert instance.query(select_query) == "1\n2\n"
instance.query("CREATE USER A")
assert "it's necessary to have grant CREATE TEMPORARY TABLE ON *.*" in instance.query_and_get_error(select_query, user = 'A')
instance.query("GRANT CREATE TEMPORARY TABLE ON *.* TO A")
assert "no one matches regular expression" in instance.query_and_get_error(select_query, user = 'A')
instance.query("GRANT SELECT ON default.table1 TO A")
assert instance.query(select_query, user = 'A') == "1\n"
instance.query("GRANT SELECT ON default.* TO A")
assert instance.query(select_query, user = 'A') == "1\n2\n"
instance.query("REVOKE SELECT ON default.table1 FROM A")
assert instance.query(select_query, user = 'A') == "2\n"
instance.query("REVOKE ALL ON default.* FROM A")
instance.query("GRANT SELECT ON default.table1 TO A")
instance.query("GRANT INSERT ON default.table2 TO A")
assert "it's necessary to have grant SELECT ON default.table2" in instance.query_and_get_error(select_query, user = 'A')

View File

@ -4,6 +4,7 @@ import random
import string
import os
import time
from multiprocessing.dummy import Pool
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance('node', main_configs=['configs/enable_test_keeper.xml', 'configs/logs_conf.xml'], with_zookeeper=True)
@ -441,3 +442,84 @@ def test_end_of_watches_session(started_cluster):
zk.close()
except:
pass
def test_concurrent_watches(started_cluster):
fake_zk = get_fake_zk()
fake_zk.restart()
global_path = "/test_concurrent_watches_0"
fake_zk.create(global_path)
dumb_watch_triggered_counter = 0
all_paths_triggered = []
existing_path = []
all_paths_created = []
watches_created = 0
def create_path_and_watch(i):
nonlocal watches_created
nonlocal all_paths_created
fake_zk.ensure_path(global_path + "/" + str(i))
# new function each time
def dumb_watch(event):
nonlocal dumb_watch_triggered_counter
dumb_watch_triggered_counter += 1
nonlocal all_paths_triggered
all_paths_triggered.append(event.path)
fake_zk.get(global_path + "/" + str(i), watch=dumb_watch)
all_paths_created.append(global_path + "/" + str(i))
watches_created += 1
existing_path.append(i)
trigger_called = 0
def trigger_watch(i):
nonlocal trigger_called
trigger_called += 1
fake_zk.set(global_path + "/" + str(i), b"somevalue")
try:
existing_path.remove(i)
except:
pass
def call(total):
for i in range(total):
create_path_and_watch(random.randint(0, 1000))
time.sleep(random.random() % 0.5)
try:
rand_num = random.choice(existing_path)
trigger_watch(rand_num)
except:
pass
while existing_path:
try:
rand_num = random.choice(existing_path)
trigger_watch(rand_num)
except:
pass
p = Pool(10)
arguments = [100] * 10
watches_must_be_created = sum(arguments)
watches_trigger_must_be_called = sum(arguments)
watches_must_be_triggered = sum(arguments)
p.map(call, arguments)
p.close()
# waiting for late watches
for i in range(50):
if dumb_watch_triggered_counter == watches_must_be_triggered:
break
time.sleep(0.1)
assert watches_created == watches_must_be_created
assert trigger_called >= watches_trigger_must_be_called
assert len(existing_path) == 0
if dumb_watch_triggered_counter != watches_must_be_triggered:
print("All created paths", all_paths_created)
print("All triggerred paths", all_paths_triggered)
print("All paths len", len(all_paths_created))
print("All triggered len", len(all_paths_triggered))
print("Diff", list(set(all_paths_created) - set(all_paths_triggered)))
assert dumb_watch_triggered_counter == watches_must_be_triggered

View File

@ -2,7 +2,7 @@
1
1
1
t_00693 Memory 1 [] 1970-01-01 00:00:00 [] [] Memory \N \N
t_00693 Memory 1 [] 1970-01-01 00:00:00 [] [] CREATE TEMPORARY TABLE t_00693 (`x` UInt8) ENGINE = Memory Memory \N \N
1
1
1

View File

@ -1,2 +1,9 @@
CREATE TEMPORARY TABLE tmp_table\n(\n `n` UInt64\n)\nENGINE = Memory AS\nSELECT number AS n\nFROM numbers(42)
CREATE TEMPORARY TABLE tmp_table\n(\n `n` UInt64\n)\nENGINE = Memory AS\nSELECT number AS n\nFROM numbers(42)
CREATE TEMPORARY TABLE tmp_table\n(\n `n` UInt64\n)\nENGINE = Memory AS\nSELECT number AS n\nFROM numbers(42)
42
OK
OK
3 9
0
1

View File

@ -6,20 +6,23 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
url_without_session="https://${CLICKHOUSE_HOST}:${CLICKHOUSE_PORT_HTTPS}/?"
url="${url_without_session}session_id=test_01098"
${CLICKHOUSE_CURL} -m 30 -sSk "$url" --data "CREATE TEMPORARY TABLE tmp_table AS SELECT number AS n FROM numbers(42)" > /dev/null;
${CLICKHOUSE_CURL} -m 30 -sSk "$url" --data "DROP TEMPORARY TABLE IF EXISTS tmp_table"
${CLICKHOUSE_CURL} -m 30 -sSk "$url" --data "CREATE TEMPORARY TABLE tmp_table AS SELECT number AS n FROM numbers(42)"
name_expr="'\`' || database || '\`.\`' || name || '\`'"
full_tmp_name=$(echo "SELECT $name_expr FROM system.tables WHERE database='_temporary_and_external_tables' AND create_table_query LIKE '%tmp_table%'" | ${CLICKHOUSE_CURL} -m 30 -sSgk "$url" -d @-)
id=$(echo "SELECT uuid FROM system.tables WHERE name='tmp_table' AND is_temporary" | ${CLICKHOUSE_CURL} -m 31 -sSgk "$url" -d @-)
internal_table_name="_temporary_and_external_tables.\`_tmp_$id\`"
echo "SELECT * FROM $full_tmp_name" | ${CLICKHOUSE_CURL} -m 60 -sSgk "$url" -d @- | grep -F "Code: 291" > /dev/null && echo "OK"
${CLICKHOUSE_CURL} -m 30 -sSk "$url" --data "SHOW CREATE TEMPORARY TABLE tmp_table"
${CLICKHOUSE_CURL} -m 30 -sSk "$url" --data "SHOW CREATE TABLE $internal_table_name"
${CLICKHOUSE_CURL} -m 30 -sSk "$url_without_session" --data "SHOW CREATE TABLE $internal_table_name"
echo -ne '0\n1\n' | ${CLICKHOUSE_CURL} -m 30 -sSkF 'file=@-' "$url&file_format=CSV&file_types=UInt64&query=SELECT+sum((number+GLOBAL+IN+(SELECT+number+AS+n+FROM+remote('127.0.0.2',+numbers(5))+WHERE+n+GLOBAL+IN+(SELECT+*+FROM+tmp_table)+AND+n+GLOBAL+NOT+IN+(SELECT+*+FROM+file)+))+AS+res),+sum(number*res)+FROM+remote('127.0.0.2',+numbers(10))";
echo "SELECT COUNT() FROM tmp_table" | ${CLICKHOUSE_CURL} -m 60 -sSgk "$url" -d @-
echo "SELECT COUNT() FROM $internal_table_name" | ${CLICKHOUSE_CURL} -m 60 -sSgk "$url" -d @- | grep -F "Code: 291" > /dev/null && echo "OK"
echo "SELECT COUNT() FROM $internal_table_name" | ${CLICKHOUSE_CURL} -m 60 -sSgk "$url_without_session" -d @- | grep -F "Code: 291" > /dev/null && echo "OK"
echo -ne '0\n1\n' | ${CLICKHOUSE_CURL} -m 30 -sSkF 'file=@-' "$url&file_format=CSV&file_types=UInt64&query=SELECT+sum((number+GLOBAL+IN+(SELECT+number+AS+n+FROM+remote('127.0.0.2',+numbers(5))+WHERE+n+GLOBAL+IN+(SELECT+*+FROM+tmp_table)+AND+n+GLOBAL+NOT+IN+(SELECT+*+FROM+file)+))+AS+res),+sum(number*res)+FROM+remote('127.0.0.2',+numbers(10))"
echo -ne '0\n1\n' | ${CLICKHOUSE_CURL} -m 30 -sSkF 'file=@-' "$url&file_format=CSV&file_types=UInt64&query=SELECT+_1%2BsleepEachRow(3)+FROM+file" &
echo -ne '0\n1\n' | ${CLICKHOUSE_CURL} -m 30 -sSkF 'file=@-' "$url&file_format=CSV&file_types=UInt64&query=SELECT+sleepEachRow(3)+FROM+file" > /dev/null &
sleep 1
full_tmp_names=$(echo "SELECT $name_expr FROM system.tables WHERE database='_temporary_and_external_tables' FORMAT TSV" | ${CLICKHOUSE_CURL} -m 30 -sSgk "$url_without_session" -d @-)
for name in $full_tmp_names
do
${CLICKHOUSE_CURL} -m 30 -sSk "${url_without_session}query=SHOW+CREATE+TABLE+$name" 1>/dev/null 2>/dev/null
done;
wait
${CLICKHOUSE_CURL} -m 30 -sSk "$url" --data "DROP TEMPORARY TABLE tmp_table"

View File

@ -0,0 +1,5 @@
CREATE TEMPORARY TABLE tmptable (`x` UInt32) ENGINE = Memory
CREATE TEMPORARY TABLE tmptable (`y` Float64, `z` String) ENGINE = Memory
x UInt32 1
y Float64 1
z String 2

View File

@ -0,0 +1,19 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. "$CURDIR"/../shell_config.sh
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&query=DROP+TEMPORARY+TABLE+IF+EXISTS+tmptable&session_id=session_1601a"
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&query=DROP+TEMPORARY+TABLE+IF+EXISTS+tmptable&session_id=session_1601b"
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&query=CREATE+TEMPORARY+TABLE+tmptable(x+UInt32)&session_id=session_1601a"
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&query=CREATE+TEMPORARY+TABLE+tmptable(y+Float64,+z+String)&session_id=session_1601b"
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&query=SELECT+create_table_query+FROM+system.tables+WHERE+database=''+AND+name='tmptable'&session_id=session_1601a"
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&query=SELECT+create_table_query+FROM+system.tables+WHERE+database=''+AND+name='tmptable'&session_id=session_1601b"
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&query=SELECT+name,type,position+FROM+system.columns+WHERE+database=''+AND+table='tmptable'&session_id=session_1601a"
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&query=SELECT+name,type,position+FROM+system.columns+WHERE+database=''+AND+table='tmptable'&session_id=session_1601b"
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&query=DROP+TEMPORARY+TABLE+tmptable&session_id=session_1601a"
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&query=DROP+TEMPORARY+TABLE+tmptable&session_id=session_1601b"

View File

@ -0,0 +1,10 @@
test_01602a CREATE TEMPORARY TABLE test_01602a (`x` UInt32) ENGINE = Memory Memory Memory 1
test_01602b CREATE TEMPORARY TABLE test_01602b (`y` Float64, `z` String) ENGINE = Memory Memory Memory 1
test_01602a x UInt32 1 0 0 0 0 0 0 0
test_01602b y Float64 1 0 0 0 0 0 0 0
test_01602b z String 2 0 0 0 0 0 0 0
CREATE TEMPORARY TABLE test_01602a\n(\n `x` UInt32\n)\nENGINE = Memory
CREATE TEMPORARY TABLE test_01602b\n(\n `y` Float64,\n `z` String\n)\nENGINE = Memory
0
0
0

View File

@ -0,0 +1,18 @@
DROP TEMPORARY TABLE IF EXISTS test_01602a;
DROP TEMPORARY TABLE IF EXISTS test_01602b;
CREATE TEMPORARY TABLE test_01602a(x UInt32);
CREATE TEMPORARY TABLE test_01602b(y Float64, z String);
SELECT database, name, create_table_query, engine, engine_full, is_temporary FROM system.tables WHERE name LIKE 'test_01602%' ORDER BY name;
SELECT * FROM system.columns WHERE table LIKE 'test_01602%' ORDER BY table, name;
SHOW CREATE TEMPORARY TABLE test_01602a;
SHOW CREATE TEMPORARY TABLE test_01602b;
SELECT COUNT() FROM system.databases WHERE name='_temporary_and_external_tables';
SELECT COUNT() FROM system.tables WHERE database='_temporary_and_external_tables';
SELECT COUNT() FROM system.columns WHERE database='_temporary_and_external_tables';
DROP TEMPORARY TABLE test_01602a;
DROP TEMPORARY TABLE test_01602b;

View File

@ -706,6 +706,9 @@ def create_as_merge(self, node=None):
with When("I grant CREATE TABLE privilege to a user"):
node.query(f"GRANT CREATE TABLE ON {table_name} TO {user_name}")
with And("I grant SELECT privilege to a user to allow executing the table function merge()"):
node.query(f"GRANT SELECT ON {source_table_name} TO {user_name}")
with Then("I try to create a table as another table"):
node.query(f"CREATE TABLE {table_name} AS merge(default,'{source_table_name}')", settings = [("user", f"{user_name}")])

View File

@ -6,6 +6,7 @@
#include <Common/Stopwatch.h>
#include <Common/ThreadPool.h>
#include <Common/randomSeed.h>
#include <common/getPageSize.h>
#include <cstdlib>
#include <iomanip>
@ -46,7 +47,7 @@ void thread(int fd, int mode, size_t min_offset, size_t max_offset, size_t block
{
using namespace DB;
Memory<> direct_buf(block_size, sysconf(_SC_PAGESIZE));
Memory<> direct_buf(block_size, ::getPageSize());
std::vector<char> simple_buf(block_size);
char * buf;

View File

@ -14,6 +14,7 @@ int main(int, char **) { return 0; }
#include <Common/ThreadPool.h>
#include <Common/Stopwatch.h>
#include <Common/randomSeed.h>
#include <common/getPageSize.h>
#include <pcg_random.hpp>
#include <IO/BufferWithOwnMemory.h>
#include <IO/ReadHelpers.h>
@ -52,7 +53,7 @@ void thread(int fd, int mode, size_t min_offset, size_t max_offset, size_t block
std::vector<Memory<>> buffers(buffers_count);
for (size_t i = 0; i < buffers_count; ++i)
buffers[i] = Memory<>(block_size, sysconf(_SC_PAGESIZE));
buffers[i] = Memory<>(block_size, ::getPageSize());
pcg64_fast rng(randomSeed());

View File

@ -6,10 +6,15 @@
#include <common/LineReader.h>
#include <common/logger_useful.h>
#include <fmt/format.h>
#include <random>
#include <iterator>
#include <algorithm>
#include <chrono>
#include <iostream>
#include <sstream>
#include <exception>
#include <future>
using namespace std;
@ -144,9 +149,98 @@ void testMultiRequest(zkutil::ZooKeeper & zk)
checkEq(zk, "/data/multirequest", "bbb");
}
std::mutex elements_mutex;
std::vector<int> current_elements;
std::atomic<int> watches_triggered = 0;
void triggerWatch(const Coordination::WatchResponse &)
{
watches_triggered++;
}
template<typename Iter, typename RandomGenerator>
Iter select_randomly(Iter start, Iter end, RandomGenerator& g)
{
std::uniform_int_distribution<> dis(0, std::distance(start, end) - 1);
std::advance(start, dis(g));
return start;
}
template<typename Iter>
Iter select_randomly(Iter start, Iter end)
{
static std::random_device rd;
static std::mt19937 gen(rd());
return select_randomly(start, end, gen);
}
std::atomic<int> element_counter = 0;
std::atomic<int> failed_setup_counter = 0;
void createPathAndSetWatch(zkutil::ZooKeeper & zk, const String & path_prefix, size_t total)
{
for (size_t i = 0; i < total; ++i)
{
int element = element_counter++;
zk.createIfNotExists(path_prefix + "/" + std::to_string(element), "");
std::string result;
if (!zk.tryGetWatch(path_prefix + "/" + std::to_string(element), result, nullptr, triggerWatch))
failed_setup_counter++;
{
std::lock_guard lock(elements_mutex);
current_elements.push_back(element);
}
std::this_thread::sleep_for(std::chrono::milliseconds(200));
{
std::lock_guard lock(elements_mutex);
if (current_elements.empty())
continue;
element = *select_randomly(current_elements.begin(), current_elements.end());
current_elements.erase(std::remove(current_elements.begin(), current_elements.end(), element), current_elements.end());
}
zk.tryRemove(path_prefix + "/" + std::to_string(element));
}
}
void tryConcurrentWatches(zkutil::ZooKeeper & zk)
{
std::string path_prefix = "/concurrent_watches";
std::vector<std::future<void>> asyncs;
zk.createIfNotExists(path_prefix, "");
for (size_t i = 0; i < 100; ++i)
{
auto callback = [&zk, path_prefix] ()
{
createPathAndSetWatch(zk, path_prefix, 100);
};
asyncs.push_back(std::async(std::launch::async, callback));
}
for (auto & async : asyncs)
{
async.wait();
}
size_t counter = 0;
while (watches_triggered != 100 * 100)
{
std::this_thread::sleep_for(std::chrono::milliseconds(100));
if (counter++ > 20)
break;
}
std::cerr << "Failed setup counter:" << failed_setup_counter << std::endl;
std::cerr << "Current elements size:" << current_elements.size() << std::endl;
std::cerr << "WatchesTriggered:" << watches_triggered << std::endl;
}
int main(int argc, char *argv[])
{
if (argc != 2)
{
std::cerr << "usage: " << argv[0] << " hosts" << std::endl;
@ -168,6 +262,7 @@ int main(int argc, char *argv[])
testMultiRequest(zk);
testCreateSetWatchEvent(zk);
testCreateListWatchEvent(zk);
tryConcurrentWatches(zk);
}
catch (...)
{