Merge branch 'master' of github.com:ClickHouse/ClickHouse into early-exit-interserver-authentication-failure

This commit is contained in:
Alexey Milovidov 2023-11-16 23:40:18 +01:00
commit e49ac4a2b0
32 changed files with 705 additions and 96 deletions

View File

@ -187,9 +187,10 @@ if (NOT CMAKE_BUILD_TYPE_UC STREQUAL "RELEASE")
endif ()
endif()
if (CMAKE_BUILD_TYPE_UC STREQUAL "RELEASE"
OR CMAKE_BUILD_TYPE_UC STREQUAL "RELWITHDEBINFO"
OR CMAKE_BUILD_TYPE_UC STREQUAL "MINSIZEREL")
if (NOT (SANITIZE_COVERAGE OR WITH_COVERAGE)
AND (CMAKE_BUILD_TYPE_UC STREQUAL "RELEASE"
OR CMAKE_BUILD_TYPE_UC STREQUAL "RELWITHDEBINFO"
OR CMAKE_BUILD_TYPE_UC STREQUAL "MINSIZEREL"))
set (OMIT_HEAVY_DEBUG_SYMBOLS_DEFAULT ON)
else()
set (OMIT_HEAVY_DEBUG_SYMBOLS_DEFAULT OFF)
@ -291,9 +292,6 @@ set (CMAKE_C_STANDARD 11)
set (CMAKE_C_EXTENSIONS ON) # required by most contribs written in C
set (CMAKE_C_STANDARD_REQUIRED ON)
# Compiler-specific coverage flags e.g. -fcoverage-mapping
option(WITH_COVERAGE "Profile the resulting binary/binaries" OFF)
if (COMPILER_CLANG)
# Enable C++14 sized global deallocation functions. It should be enabled by setting -std=c++14 but I'm not sure.
# See https://reviews.llvm.org/D112921
@ -309,18 +307,12 @@ if (COMPILER_CLANG)
set(BRANCHES_WITHIN_32B_BOUNDARIES "-mbranches-within-32B-boundaries")
set(COMPILER_FLAGS "${COMPILER_FLAGS} ${BRANCHES_WITHIN_32B_BOUNDARIES}")
endif()
if (WITH_COVERAGE)
set(COMPILER_FLAGS "${COMPILER_FLAGS} -fprofile-instr-generate -fcoverage-mapping")
# If we want to disable coverage for specific translation units
set(WITHOUT_COVERAGE "-fno-profile-instr-generate -fno-coverage-mapping")
endif()
endif ()
set (COMPILER_FLAGS "${COMPILER_FLAGS}")
# Our built-in unwinder only supports DWARF version up to 4.
set (DEBUG_INFO_FLAGS "-g -gdwarf-4")
set (DEBUG_INFO_FLAGS "-g")
# Disable omit frame pointer compiler optimization using -fno-omit-frame-pointer
option(DISABLE_OMIT_FRAME_POINTER "Disable omit frame pointer compiler optimization" OFF)

View File

@ -1,3 +1,5 @@
add_compile_options($<$<OR:$<COMPILE_LANGUAGE:C>,$<COMPILE_LANGUAGE:CXX>>:${COVERAGE_FLAGS}>)
if (USE_CLANG_TIDY)
set (CMAKE_CXX_CLANG_TIDY "${CLANG_TIDY_PATH}")
endif ()

View File

@ -1,11 +1,15 @@
#include "coverage.h"
#if WITH_COVERAGE
#pragma GCC diagnostic ignored "-Wreserved-identifier"
# include <mutex>
# include <unistd.h>
/// WITH_COVERAGE enables the default implementation of code coverage,
/// that dumps a map to the filesystem.
#if WITH_COVERAGE
#include <mutex>
#include <unistd.h>
# if defined(__clang__)
@ -31,3 +35,131 @@ void dumpCoverageReportIfPossible()
#endif
}
/// SANITIZE_COVERAGE enables code instrumentation,
/// but leaves the callbacks implementation to us,
/// which we use to calculate coverage on a per-test basis
/// and to write it to system tables.
#if defined(SANITIZE_COVERAGE)
namespace
{
bool pc_guards_initialized = false;
bool pc_table_initialized = false;
uint32_t * guards_start = nullptr;
uint32_t * guards_end = nullptr;
uintptr_t * coverage_array = nullptr;
size_t coverage_array_size = 0;
uintptr_t * all_addresses_array = nullptr;
size_t all_addresses_array_size = 0;
}
extern "C"
{
/// This is called at least once for every DSO for initialization.
/// But we will use it only for the main DSO.
void __sanitizer_cov_trace_pc_guard_init(uint32_t * start, uint32_t * stop)
{
if (pc_guards_initialized)
return;
pc_guards_initialized = true;
/// The function can be called multiple times, but we need to initialize only once.
if (start == stop || *start)
return;
guards_start = start;
guards_end = stop;
coverage_array_size = stop - start;
/// Note: we will leak this.
coverage_array = static_cast<uintptr_t*>(malloc(sizeof(uintptr_t) * coverage_array_size));
resetCoverage();
}
/// This is called at least once for every DSO for initialization
/// and provides information about all instrumented addresses.
void __sanitizer_cov_pcs_init(const uintptr_t * pcs_begin, const uintptr_t * pcs_end)
{
if (pc_table_initialized)
return;
pc_table_initialized = true;
all_addresses_array = static_cast<uintptr_t*>(malloc(sizeof(uintptr_t) * coverage_array_size));
all_addresses_array_size = pcs_end - pcs_begin;
/// They are not a real pointers, but also contain a flag in the most significant bit,
/// in which we are not interested for now. Reset it.
for (size_t i = 0; i < all_addresses_array_size; ++i)
all_addresses_array[i] = pcs_begin[i] & 0x7FFFFFFFFFFFFFFFULL;
}
/// This is called at every basic block / edge, etc.
void __sanitizer_cov_trace_pc_guard(uint32_t * guard)
{
/// Duplicate the guard check.
if (!*guard)
return;
*guard = 0;
/// If you set *guard to 0 this code will not be called again for this edge.
/// Now we can get the PC and do whatever you want:
/// - store it somewhere or symbolize it and print right away.
/// The values of `*guard` are as you set them in
/// __sanitizer_cov_trace_pc_guard_init and so you can make them consecutive
/// and use them to dereference an array or a bit vector.
void * pc = __builtin_return_address(0);
coverage_array[guard - guards_start] = reinterpret_cast<uintptr_t>(pc);
}
}
__attribute__((no_sanitize("coverage")))
std::span<const uintptr_t> getCoverage()
{
return {coverage_array, coverage_array_size};
}
__attribute__((no_sanitize("coverage")))
std::span<const uintptr_t> getAllInstrumentedAddresses()
{
return {all_addresses_array, all_addresses_array_size};
}
__attribute__((no_sanitize("coverage")))
void resetCoverage()
{
memset(coverage_array, 0, coverage_array_size * sizeof(*coverage_array));
/// The guard defines whether the __sanitizer_cov_trace_pc_guard should be called.
/// For example, you can unset it after first invocation to prevent excessive work.
/// Initially set all the guards to 1 to enable callbacks.
for (uint32_t * x = guards_start; x < guards_end; ++x)
*x = 1;
}
#else
std::span<const uintptr_t> getCoverage()
{
return {};
}
std::span<const uintptr_t> getAllInstrumentedAddresses()
{
return {};
}
void resetCoverage()
{
}
#endif

View File

@ -1,5 +1,8 @@
#pragma once
#include <span>
#include <cstdint>
/// Flush coverage report to file, depending on coverage system
/// proposed by compiler (llvm for clang and gcov for gcc).
///
@ -7,3 +10,16 @@
/// Thread safe (use exclusive lock).
/// Idempotent, may be called multiple times.
void dumpCoverageReportIfPossible();
/// This is effective if SANITIZE_COVERAGE is enabled at build time.
/// Get accumulated unique program addresses of the instrumented parts of the code,
/// seen so far after program startup or after previous reset.
/// The returned span will be represented as a sparse map, containing mostly zeros, which you should filter away.
std::span<const uintptr_t> getCoverage();
/// Get all instrumented addresses that could be in the coverage.
std::span<const uintptr_t> getAllInstrumentedAddresses();
/// Reset the accumulated coverage.
/// This is useful to compare coverage of different tests, including differential coverage.
void resetCoverage();

View File

@ -1,5 +1,6 @@
#include "memcpy.h"
__attribute__((no_sanitize("coverage")))
extern "C" void * memcpy(void * __restrict dst, const void * __restrict src, size_t size)
{
return inline_memcpy(dst, src, size);

View File

@ -93,7 +93,7 @@
* See https://habr.com/en/company/yandex/blog/457612/
*/
__attribute__((no_sanitize("coverage")))
static inline void * inline_memcpy(void * __restrict dst_, const void * __restrict src_, size_t size)
{
/// We will use pointer arithmetic, so char pointer will be used.

View File

@ -58,3 +58,27 @@ if (SANITIZE)
message (FATAL_ERROR "Unknown sanitizer type: ${SANITIZE}")
endif ()
endif()
# Default coverage instrumentation (dumping the coverage map on exit)
option(WITH_COVERAGE "Instrumentation for code coverage with default implementation" OFF)
if (WITH_COVERAGE)
message (INFORMATION "Enabled instrumentation for code coverage")
set(COVERAGE_FLAGS "-fprofile-instr-generate -fcoverage-mapping")
endif()
option (SANITIZE_COVERAGE "Instrumentation for code coverage with custom callbacks" OFF)
if (SANITIZE_COVERAGE)
message (INFORMATION "Enabled instrumentation for code coverage")
# We set this define for whole build to indicate that at least some parts are compiled with coverage.
# And to expose it in system.build_options.
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DSANITIZE_COVERAGE=1")
set (CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -DSANITIZE_COVERAGE=1")
# But the actual coverage will be enabled on per-library basis: for ClickHouse code, but not for 3rd-party.
set (COVERAGE_FLAGS "-fsanitize-coverage=trace-pc-guard,pc-table")
endif()
set (WITHOUT_COVERAGE_FLAGS "-fno-profile-instr-generate -fno-coverage-mapping -fno-sanitize-coverage=trace-pc-guard,pc-table")

View File

@ -3,15 +3,6 @@
set (CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -w -ffunction-sections -fdata-sections")
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -w -ffunction-sections -fdata-sections")
if (WITH_COVERAGE)
set (WITHOUT_COVERAGE_LIST ${WITHOUT_COVERAGE})
separate_arguments(WITHOUT_COVERAGE_LIST)
# disable coverage for contib files and build with optimisations
if (COMPILER_CLANG)
add_compile_options(-O3 -DNDEBUG -finline-functions -finline-hint-functions ${WITHOUT_COVERAGE_LIST})
endif()
endif()
if (SANITIZE STREQUAL "undefined")
# 3rd-party libraries usually not intended to work with UBSan.
add_compile_options(-fno-sanitize=undefined)

View File

@ -366,38 +366,6 @@ else ()
COMMAND_ECHO STDOUT)
endif ()
# add_custom_command (
# OUTPUT ${PROTOC_BUILD_DIR}
# COMMAND mkdir -p ${PROTOC_BUILD_DIR})
#
# add_custom_command (
# OUTPUT "${PROTOC_BUILD_DIR}/CMakeCache.txt"
#
# COMMAND ${CMAKE_COMMAND}
# -G"${CMAKE_GENERATOR}"
# -DCMAKE_MAKE_PROGRAM="${CMAKE_MAKE_PROGRAM}"
# -DCMAKE_C_COMPILER="${CMAKE_C_COMPILER}"
# -DCMAKE_CXX_COMPILER="${CMAKE_CXX_COMPILER}"
# -Dprotobuf_BUILD_TESTS=0
# -Dprotobuf_BUILD_CONFORMANCE=0
# -Dprotobuf_BUILD_EXAMPLES=0
# -Dprotobuf_BUILD_PROTOC_BINARIES=1
# "${protobuf_source_dir}/cmake"
#
# DEPENDS "${PROTOC_BUILD_DIR}"
# WORKING_DIRECTORY "${PROTOC_BUILD_DIR}"
# COMMENT "Configuring 'protoc' for host architecture."
# USES_TERMINAL)
#
# add_custom_command (
# OUTPUT "${PROTOC_BUILD_DIR}/protoc"
# COMMAND ${CMAKE_COMMAND} --build "${PROTOC_BUILD_DIR}"
# DEPENDS "${PROTOC_BUILD_DIR}/CMakeCache.txt"
# COMMENT "Building 'protoc' for host architecture."
# USES_TERMINAL)
#
# add_custom_target (protoc-host DEPENDS "${PROTOC_BUILD_DIR}/protoc")
add_executable(protoc IMPORTED GLOBAL)
set_target_properties (protoc PROPERTIES IMPORTED_LOCATION "${PROTOC_BUILD_DIR}/protoc")
add_dependencies(protoc "${PROTOC_BUILD_DIR}/protoc")

View File

@ -6,9 +6,9 @@ sidebar_label: Random Numbers
# Functions for Generating Random Numbers
All functions in this section accept zero or one arguments. The only use of the argument (if provided) is to prevent prevent [common subexpression
elimination](../../sql-reference/functions/index.md#common-subexpression-elimination) such that two different execution of the same random
function in a query return different random values.
All functions in this section accept zero or one arguments. The only use of the argument (if provided) is to prevent [common subexpression
elimination](../../sql-reference/functions/index.md#common-subexpression-elimination) such that two different executions within a row of the same random
function return different random values.
Related content
- Blog: [Generating random data in ClickHouse](https://clickhouse.com/blog/generating-random-test-distribution-data-for-clickhouse)

View File

@ -1,3 +1,5 @@
add_compile_options($<$<OR:$<COMPILE_LANGUAGE:C>,$<COMPILE_LANGUAGE:CXX>>:${COVERAGE_FLAGS}>)
if (USE_CLANG_TIDY)
set (CMAKE_CXX_CLANG_TIDY "${CLANG_TIDY_PATH}")
endif ()

View File

@ -676,6 +676,10 @@ try
global_context->addWarningMessage("Server was built with sanitizer. It will work slowly.");
#endif
#if defined(SANITIZE_COVERAGE) || WITH_COVERAGE
global_context->addWarningMessage("Server was built with code coverage. It will work slowly.");
#endif
const size_t physical_server_memory = getMemoryAmount();
LOG_INFO(log, "Available RAM: {}; physical cores: {}; logical cores: {}.",

View File

@ -1,3 +1,5 @@
add_compile_options($<$<OR:$<COMPILE_LANGUAGE:C>,$<COMPILE_LANGUAGE:CXX>>:${COVERAGE_FLAGS}>)
if (USE_INCLUDE_WHAT_YOU_USE)
set (CMAKE_CXX_INCLUDE_WHAT_YOU_USE ${IWYU_PATH})
endif ()
@ -293,7 +295,8 @@ set_source_files_properties(
Common/Elf.cpp
Common/Dwarf.cpp
Common/SymbolIndex.cpp
PROPERTIES COMPILE_FLAGS "-O2 ${WITHOUT_COVERAGE}")
Common/ThreadFuzzer.cpp
PROPERTIES COMPILE_FLAGS "-O2 ${WITHOUT_COVERAGE_FLAGS}")
target_link_libraries (clickhouse_common_io
PRIVATE

View File

@ -46,8 +46,8 @@ class Elf;
* can parse Debug Information Entries (DIEs), abbreviations, attributes (of
* all forms), and we can interpret bytecode for the line number VM.
*
* We can interpret DWARF records of version 2, 3, or 4, although we don't
* actually support many of the version 4 features (such as VLIW, multiple
* We can interpret DWARF records of version 2, 3, 4, or 5, although we don't
* actually support many of the features of versions 4 and 5 (such as VLIW, multiple
* operations per instruction)
*
* Note that the DWARF record parser does not allocate heap memory at all.

View File

@ -416,7 +416,7 @@ private:
std::to_integer<UInt32>(bytes.front()) & MAX_ZERO_BYTE_COUNT);
if (zero_byte_count1 > VALUE_SIZE || zero_byte_count2 > VALUE_SIZE) [[unlikely]]
throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "Invalid compressed data");
throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "Invalid zero byte count(s): {} and {}", zero_byte_count1, zero_byte_count2);
size_t tail_size1 = VALUE_SIZE - zero_byte_count1;
size_t tail_size2 = VALUE_SIZE - zero_byte_count2;
@ -424,7 +424,7 @@ private:
size_t expected_size = 0;
if (__builtin_add_overflow(tail_size1, tail_size2, &expected_size)
|| __builtin_add_overflow(expected_size, 1, &expected_size)) [[unlikely]]
throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "Invalid compressed data");
throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "Overflow occurred while calculating expected size");
if (bytes.size() < expected_size) [[unlikely]]
throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "Unexpected end of encoded sequence");

View File

@ -1083,12 +1083,14 @@ void DatabaseReplicated::recoverLostReplica(const ZooKeeperPtr & current_zookeep
}
LOG_INFO(log, "All tables are created successfully");
if (max_log_ptr_at_creation != 0)
chassert(max_log_ptr_at_creation || our_log_ptr);
UInt32 first_entry_to_mark_finished = new_replica ? max_log_ptr_at_creation : our_log_ptr;
if (first_entry_to_mark_finished)
{
/// If the replica is new and some of the queries applied during recovery
/// where issued after the replica was created, then other nodes might be
/// waiting for this node to notify them that the query was applied.
for (UInt32 ptr = max_log_ptr_at_creation; ptr <= max_log_ptr; ++ptr)
for (UInt32 ptr = first_entry_to_mark_finished; ptr <= max_log_ptr; ++ptr)
{
auto entry_name = DDLTaskBase::getLogEntryName(ptr);
auto path = fs::path(zookeeper_path) / "log" / entry_name / "finished" / getFullReplicaName();

View File

@ -0,0 +1,94 @@
#if defined(SANITIZE_COVERAGE)
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypesNumber.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnVector.h>
#include <Columns/ColumnConst.h>
#include <Functions/FunctionFactory.h>
#include <Functions/IFunction.h>
#include <Interpreters/Context.h>
#include <base/coverage.h>
namespace DB
{
namespace
{
enum class Kind
{
Current,
All
};
/** If ClickHouse is build with coverage instrumentation, returns an array
* of currently accumulated (`coverage`) / all possible (`coverageAll`) unique code addresses.
*/
class FunctionCoverage : public IFunction
{
private:
Kind kind;
public:
String getName() const override
{
return kind == Kind::Current ? "coverage" : "coverageAll";
}
explicit FunctionCoverage(Kind kind_) : kind(kind_)
{
}
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override
{
return false;
}
size_t getNumberOfArguments() const override
{
return 0;
}
bool isDeterministic() const override
{
return false;
}
DataTypePtr getReturnTypeImpl(const DataTypes & /*arguments*/) const override
{
return std::make_shared<DataTypeArray>(std::make_shared<DataTypeUInt64>());
}
ColumnPtr executeImpl(const ColumnsWithTypeAndName &, const DataTypePtr &, size_t input_rows_count) const override
{
auto coverage_table = kind == Kind::Current ? getCoverage() : getAllInstrumentedAddresses();
auto column_addresses = ColumnUInt64::create();
auto & data = column_addresses->getData();
for (auto ptr : coverage_table)
if (ptr)
data.push_back(ptr);
auto column_array = ColumnArray::create(
std::move(column_addresses),
ColumnArray::ColumnOffsets::create(1, data.size()));
return ColumnConst::create(std::move(column_array), input_rows_count);
}
};
}
REGISTER_FUNCTION(Coverage)
{
factory.registerFunction("coverage", [](ContextPtr){ return std::make_unique<FunctionToOverloadResolverAdaptor>(std::make_shared<FunctionCoverage>(Kind::Current)); });
factory.registerFunction("coverageAll", [](ContextPtr){ return std::make_unique<FunctionToOverloadResolverAdaptor>(std::make_shared<FunctionCoverage>(Kind::All)); });
}
}
#endif

View File

@ -57,6 +57,7 @@
#include <Parsers/ASTSystemQuery.h>
#include <Parsers/ASTCreateQuery.h>
#include <Common/ThreadFuzzer.h>
#include <base/coverage.h>
#include <csignal>
#include <algorithm>
#include <unistd.h>
@ -687,6 +688,12 @@ BlockIO InterpreterSystemQuery::execute()
FailPointInjection::disableFailPoint(query.fail_point_name);
break;
}
case Type::RESET_COVERAGE:
{
getContext()->checkAccess(AccessType::SYSTEM);
resetCoverage();
break;
}
default:
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown type of SYSTEM query");
}
@ -1301,6 +1308,7 @@ AccessRightsElements InterpreterSystemQuery::getRequiredAccessForDDLOnCluster()
case Type::START_THREAD_FUZZER:
case Type::ENABLE_FAILPOINT:
case Type::DISABLE_FAILPOINT:
case Type::RESET_COVERAGE:
case Type::UNKNOWN:
case Type::END: break;
}

View File

@ -53,6 +53,18 @@ bool PredicateExpressionsOptimizer::optimize(ASTSelectQuery & select_query)
return false;
}
static bool hasInputTableFunction(const ASTPtr & expr)
{
if (const auto * func = typeid_cast<const ASTFunction *>(expr.get()); func && func->name == "input")
return true;
for (const auto & child : expr->children)
if (hasInputTableFunction(child))
return true;
return false;
}
std::vector<ASTs> PredicateExpressionsOptimizer::extractTablesPredicates(const ASTPtr & where, const ASTPtr & prewhere)
{
std::vector<ASTs> tables_predicates(tables_with_columns.size());
@ -72,6 +84,11 @@ std::vector<ASTs> PredicateExpressionsOptimizer::extractTablesPredicates(const A
return {}; /// Not optimized when predicate contains stateful function or indeterministic function or window functions
}
/// Skip predicate like `... IN (SELECT ... FROM input())` because
/// it can be duplicated but we can't execute `input()` twice.
if (hasInputTableFunction(predicate_expression))
return {};
if (!expression_info.is_array_join)
{
if (expression_info.unique_reference_tables_pos.size() == 1)

View File

@ -86,6 +86,7 @@ public:
START_PULLING_REPLICATION_LOG,
STOP_CLEANUP,
START_CLEANUP,
RESET_COVERAGE,
END
};

View File

@ -448,14 +448,14 @@ bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected &
}
case Type::DROP_FORMAT_SCHEMA_CACHE:
{
if (ParserKeyword{"FOR"}.ignore(pos, expected))
{
if (ParserKeyword{"Protobuf"}.ignore(pos, expected))
res->schema_cache_format = "Protobuf";
else
return false;
}
break;
if (ParserKeyword{"FOR"}.ignore(pos, expected))
{
if (ParserKeyword{"Protobuf"}.ignore(pos, expected))
res->schema_cache_format = "Protobuf";
else
return false;
}
break;
}
case Type::UNFREEZE:
{

View File

@ -5,8 +5,11 @@
#include <memory>
#include <Processors/ISource.h>
#include <Processors/Sources/ThrowingExceptionSource.h>
#include <Processors/QueryPlan/ISourceStep.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <QueryPipeline/Pipe.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
namespace DB
{
@ -14,6 +17,7 @@ namespace DB
namespace ErrorCodes
{
extern const int INVALID_USAGE_OF_INPUT;
extern const int LOGICAL_ERROR;
}
StorageInput::StorageInput(const StorageID & table_id, const ColumnsDescription & columns_)
@ -47,11 +51,33 @@ public:
void StorageInput::setPipe(Pipe pipe_)
{
pipe = std::move(pipe_);
was_pipe_initialized = true;
}
class ReadFromInput : public ISourceStep
{
public:
std::string getName() const override { return "ReadFromInput"; }
void initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override;
Pipe StorageInput::read(
const Names & /*column_names*/,
ReadFromInput(
Block sample_block,
Pipe pipe_,
StorageInput & storage_)
: ISourceStep(DataStream{.header = std::move(sample_block)})
, pipe(std::move(pipe_))
, storage(storage_)
{
}
private:
Pipe pipe;
StorageInput & storage;
};
void StorageInput::read(
QueryPlan & query_plan,
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & /*query_info*/,
ContextPtr context,
@ -59,20 +85,43 @@ Pipe StorageInput::read(
size_t /*max_block_size*/,
size_t /*num_streams*/)
{
Pipes pipes;
storage_snapshot->check(column_names);
Block sample_block = storage_snapshot->metadata->getSampleBlock();
Pipe input_source_pipe;
auto query_context = context->getQueryContext();
/// It is TCP request if we have callbacks for input().
if (query_context->getInputBlocksReaderCallback())
{
/// Send structure to the client.
query_context->initializeInput(shared_from_this());
return Pipe(std::make_shared<StorageInputSource>(query_context, storage_snapshot->metadata->getSampleBlock()));
input_source_pipe = Pipe(std::make_shared<StorageInputSource>(query_context, sample_block));
}
if (pipe.empty())
auto reading = std::make_unique<ReadFromInput>(
std::move(sample_block),
std::move(input_source_pipe),
*this);
query_plan.addStep(std::move(reading));
}
void ReadFromInput::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{
if (!pipe.empty())
{
pipeline.init(std::move(pipe));
return;
}
if (!storage.was_pipe_initialized)
throw Exception(ErrorCodes::INVALID_USAGE_OF_INPUT, "Input stream is not initialized, input() must be used only in INSERT SELECT query");
return std::move(pipe);
if (storage.was_pipe_used)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Trying to read from input() twice.");
pipeline.init(std::move(storage.pipe));
storage.was_pipe_used = true;
}
}

View File

@ -10,6 +10,7 @@ namespace DB
class StorageInput final : public IStorage
{
friend class ReadFromInput;
public:
StorageInput(const StorageID & table_id, const ColumnsDescription & columns_);
@ -18,7 +19,8 @@ public:
/// A table will read from this stream.
void setPipe(Pipe pipe_);
Pipe read(
void read(
QueryPlan & query_plan,
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
@ -29,5 +31,7 @@ public:
private:
Pipe pipe;
bool was_pipe_initialized = false;
bool was_pipe_used = false;
};
}

View File

@ -8972,7 +8972,7 @@ void StorageReplicatedMergeTree::createTableSharedID() const
else if (code == Coordination::Error::ZNONODE) /// table completely dropped, we can choose any id we want
{
id = toString(UUIDHelpers::Nil);
LOG_DEBUG(log, "Table was completely drop, we can use anything as ID (will use {})", id);
LOG_DEBUG(log, "Table was completely dropped, and we can use anything as ID (will use {})", id);
}
else if (code != Coordination::Error::ZOK)
{

1
tests/ci/worker/.gitignore vendored Normal file
View File

@ -0,0 +1 @@
generated_*init_runner.sh

View File

@ -0,0 +1,85 @@
#!/usr/bin/env bash
usage() {
echo "Usage: $0 ENVIRONMENT" >&2
echo "Valid values for ENVIRONMENT: staging, production" >&2
exit 1
}
case "$1" in
staging|production)
ENVIRONMENT="$1" ;;
--help)
usage ;;
*)
echo "Invalid argument" >&2
usage ;;
esac
cd "$(dirname "$0")" || exit 1
SOURCE_SCRIPT='init_runner.sh'
check_response() {
# Are we even in the interactive shell?
[ -t 1 ] || return 1
local request
request="$1"
read -rp "$request (y/N): " response
case "$response" in
[Yy])
return 0
# Your code to continue goes here
;;
*)
return 1
;;
esac
}
check_dirty() {
if [ -n "$(git status --porcelain=v2 "$SOURCE_SCRIPT")" ]; then
echo "The $SOURCE_SCRIPT has uncommited changes, won't deploy it" >&2
exit 1
fi
}
GIT_HASH=$(git log -1 --format=format:%H)
header() {
cat << EOF
#!/usr/bin/env bash
echo 'The $ENVIRONMENT script is generated from $SOURCE_SCRIPT, commit $GIT_HASH'
EOF
}
body() {
local first_line
first_line=$(sed -n '/^# THE SCRIPT START$/{=;q}' "$SOURCE_SCRIPT")
if [ -z "$first_line" ]; then
echo "The pattern '# THE SCRIPT START' is not found in $SOURCE_SCRIPT" >&2
exit 1
fi
tail "+$first_line" "$SOURCE_SCRIPT"
}
GENERATED_FILE="generated_${ENVIRONMENT}_${SOURCE_SCRIPT}"
{ header && body; } > "$GENERATED_FILE"
echo "The file $GENERATED_FILE is generated"
if check_response "Display the content of $GENERATED_FILE?"; then
if [ -z "$PAGER" ]; then
less "$GENERATED_FILE"
else
$PAGER "$GENERATED_FILE"
fi
fi
check_dirty
S3_OBJECT=${S3_OBJECT:-s3://github-runners-data/cloud-init/${ENVIRONMENT}.sh}
if check_response "Deploy the generated script to $S3_OBJECT?"; then
aws s3 mv "$GENERATED_FILE" "$S3_OBJECT"
fi

View File

@ -1,4 +1,46 @@
#!/usr/bin/env bash
cat > /dev/null << 'EOF'
The following content is embedded into the s3 object via the script
deploy-runner-init.sh {staging,production}
with additional helping information
In the `user data` you should define as the following text
between `### COPY BELOW` and `### COPY ABOVE`
### COPY BELOW
Content-Type: multipart/mixed; boundary="//"
MIME-Version: 1.0
--//
Content-Type: text/cloud-config; charset="us-ascii"
MIME-Version: 1.0
Content-Transfer-Encoding: 7bit
Content-Disposition: attachment; filename="cloud-config.txt"
#cloud-config
cloud_final_modules:
- [scripts-user, always]
--//
Content-Type: text/x-shellscript; charset="us-ascii"
MIME-Version: 1.0
Content-Transfer-Encoding: 7bit
Content-Disposition: attachment; filename="userdata.txt"
#!/bin/bash
INSTANCE_ID=$(ec2metadata --instance-id)
INIT_ENVIRONMENT=$(/usr/local/bin/aws ec2 describe-tags --filters "Name=resource-id,Values=$INSTANCE_ID" --query "Tags[?Key=='github:init-environment'].Value" --output text)
echo "Downloading and using $INIT_ENVIRONMENT cloud-init.sh"
aws s3 cp "s3://github-runners-data/cloud-init/${INIT_ENVIRONMENT:-production}.sh" /tmp/cloud-init.sh
chmod 0700 /tmp/cloud-init.sh
exec bash /tmp/cloud-init.sh
--//
### COPY ABOVE
EOF
# THE SCRIPT START
set -uo pipefail
####################################
@ -88,6 +130,23 @@ terminate_and_exit() {
declare -f terminate_and_exit >> /tmp/actions-hooks/common.sh
check_spot_instance_is_old() {
# This function should be executed ONLY BETWEEN runnings.
# It's unsafe to execute while the runner is working!
local LIFE_CYCLE
LIFE_CYCLE=$(curl -s --fail http://169.254.169.254/latest/meta-data/instance-life-cycle)
if [ "$LIFE_CYCLE" == "spot" ]; then
local UPTIME
UPTIME=$(< /proc/uptime)
UPTIME=${UPTIME%%.*}
if (( 3600 < UPTIME )); then
echo "The spot instance has uptime $UPTIME, it's time to shut it down"
return 0
fi
fi
return 1
}
check_proceed_spot_termination() {
# The function checks and proceeds spot instance termination if exists
# The event for spot instance termination
@ -104,7 +163,7 @@ check_proceed_spot_termination() {
if [ -n "$runner_pid" ]; then
# Kill the runner to not allow it cancelling the job
# shellcheck disable=SC2046
kill -9 $(list_children "$runner_pid")
kill -9 "$runner_pid" $(list_children "$runner_pid")
fi
sudo -u ubuntu ./config.sh remove --token "$(get_runner_token)"
terminate_and_exit
@ -119,6 +178,7 @@ no_terminating_metadata() {
# The event for rebalance recommendation. Not strict, so we have some room to make a decision here
if curl -s --fail http://169.254.169.254/latest/meta-data/events/recommendations/rebalance; then
echo 'Received recommendation to rebalance, checking the uptime'
local UPTIME
UPTIME=$(< /proc/uptime)
UPTIME=${UPTIME%%.*}
# We don't shutdown the instances younger than 30m
@ -260,14 +320,17 @@ while true; do
# If runner is not active, check that it needs to terminate itself
echo "Checking if the instance suppose to terminate"
no_terminating_metadata || terminate_on_event
check_spot_instance_is_old && terminate_and_exit
check_proceed_spot_termination
echo "Going to configure runner"
sudo -u ubuntu ./config.sh --url $RUNNER_URL --token "$(get_runner_token)" --ephemeral \
sudo -u ubuntu ./config.sh --url $RUNNER_URL --token "$(get_runner_token)" \
--ephemeral --disableupdate --unattended \
--runnergroup Default --labels "$LABELS" --work _work --name "$INSTANCE_ID"
echo "Another one check to avoid race between runner and infrastructure"
no_terminating_metadata || terminate_on_event
check_spot_instance_is_old && terminate_and_exit
check_proceed_spot_termination
echo "Run"
@ -275,7 +338,7 @@ while true; do
ACTIONS_RUNNER_HOOK_JOB_STARTED=/tmp/actions-hooks/pre-run.sh \
ACTIONS_RUNNER_HOOK_JOB_COMPLETED=/tmp/actions-hooks/post-run.sh \
./run.sh &
sleep 15
sleep 10
else
echo "Runner is working with pid $runner_pid, checking the metadata in background"
check_proceed_spot_termination
@ -291,8 +354,8 @@ while true; do
terminate_and_exit
fi
fi
sleep 5
fi
sleep 5
done
# vim:ts=4:sw=4

View File

@ -9,7 +9,7 @@ set -xeuo pipefail
echo "Running prepare script"
export DEBIAN_FRONTEND=noninteractive
export RUNNER_VERSION=2.304.0
export RUNNER_VERSION=2.311.0
export RUNNER_HOME=/home/ubuntu/actions-runner
deb_arch() {
@ -56,12 +56,12 @@ apt-get install --yes --no-install-recommends \
qemu-user-static \
unzip
# Install docker
curl -fsSL https://download.docker.com/linux/ubuntu/gpg | gpg --dearmor -o /usr/share/keyrings/docker-archive-keyring.gpg
echo "deb [arch=$(deb_arch) signed-by=/usr/share/keyrings/docker-archive-keyring.gpg] https://download.docker.com/linux/ubuntu $(lsb_release -cs) stable" | tee /etc/apt/sources.list.d/docker.list > /dev/null
apt-get update
apt-get install --yes --no-install-recommends docker-ce docker-buildx-plugin docker-ce-cli containerd.io
usermod -aG docker ubuntu
@ -81,6 +81,14 @@ cat <<EOT > /etc/docker/daemon.json
}
EOT
# Install azure-cli
curl -sLS https://packages.microsoft.com/keys/microsoft.asc | gpg --dearmor -o /etc/apt/keyrings/microsoft.gpg
AZ_DIST=$(lsb_release -cs)
echo "deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/keyrings/microsoft.gpg] https://packages.microsoft.com/repos/azure-cli/ $AZ_DIST main" | tee /etc/apt/sources.list.d/azure-cli.list
apt-get update
apt-get install --yes --no-install-recommends azure-cli
# Increase the limit on number of virtual memory mappings to aviod 'Cannot mmap' error
echo "vm.max_map_count = 2097152" > /etc/sysctl.d/01-increase-map-counts.conf
@ -88,10 +96,12 @@ systemctl restart docker
# buildx builder is user-specific
sudo -u ubuntu docker buildx version
sudo -u ubuntu docker buildx rm default-builder || : # if it's the second attempt
sudo -u ubuntu docker buildx create --use --name default-builder
pip install boto3 pygithub requests urllib3 unidiff dohq-artifactory
rm -rf $RUNNER_HOME # if it's the second attempt
mkdir -p $RUNNER_HOME && cd $RUNNER_HOME
RUNNER_ARCHIVE="actions-runner-linux-$(runner_arch)-$RUNNER_VERSION.tar.gz"
@ -130,3 +140,44 @@ systemctl enable amazon-cloudwatch-agent.service
# The following line is used in aws TOE check.
touch /var/tmp/clickhouse-ci-ami.success
# END OF THE SCRIPT
# TOE description
# name: CIInfrastructurePrepare
# description: instals the infrastructure for ClickHouse CI runners
# schemaVersion: 1.0
#
# phases:
# - name: build
# steps:
# - name: DownloadRemoteScript
# maxAttempts: 3
# action: WebDownload
# onFailure: Abort
# inputs:
# - source: https://github.com/ClickHouse/ClickHouse/raw/653da5f00219c088af66d97a8f1ea3e35e798268/tests/ci/worker/prepare-ci-ami.sh
# destination: /tmp/prepare-ci-ami.sh
# - name: RunScript
# maxAttempts: 3
# action: ExecuteBash
# onFailure: Abort
# inputs:
# commands:
# - bash -x '{{build.DownloadRemoteScript.inputs[0].destination}}'
#
#
# - name: validate
# steps:
# - name: RunScript
# maxAttempts: 3
# action: ExecuteBash
# onFailure: Abort
# inputs:
# commands:
# - ls /var/tmp/clickhouse-ci-ami.success
# - name: Cleanup
# action: DeleteFile
# onFailure: Abort
# maxAttempts: 3
# inputs:
# - path: /var/tmp/clickhouse-ci-ami.success

View File

@ -1173,6 +1173,25 @@ class TestCase:
description_full += result.reason.value
description_full += result.description
if (
args.collect_per_test_coverage
and BuildFlags.SANITIZE_COVERAGE in args.build_flags
):
clickhouse_execute(
args,
f"INSERT INTO system.coverage SELECT now(), '{self.case}', coverage()",
retry_error_codes=True,
)
coverage = clickhouse_execute(
args,
"SELECT length(coverage())",
retry_error_codes=True,
).decode()
description_full += f" Coverage: {coverage}"
description_full += "\n"
if result.status == TestStatus.FAIL and self.testcase_args:
@ -1231,6 +1250,17 @@ class TestCase:
+ pattern
)
# We want to calculate per-test code coverage. That's why we reset it before each test.
if (
args.collect_per_test_coverage
and BuildFlags.SANITIZE_COVERAGE in args.build_flags
):
clickhouse_execute(
args,
"SYSTEM RESET COVERAGE",
retry_error_codes=True,
)
command = pattern.format(**params)
proc = Popen(command, shell=True, env=os.environ)
@ -1872,6 +1902,7 @@ class BuildFlags:
UNDEFINED = "ubsan"
MEMORY = "msan"
DEBUG = "debug"
SANITIZE_COVERAGE = "sanitize-coverage"
RELEASE = "release"
ORDINARY_DATABASE = "ordinary-database"
POLYMORPHIC_PARTS = "polymorphic-parts"
@ -1891,6 +1922,8 @@ def collect_build_flags(args):
result.append(BuildFlags.UNDEFINED)
elif b"-fsanitize=memory" in value:
result.append(BuildFlags.MEMORY)
elif b"-DSANITIZE_COVERAGE=1" in value:
result.append(BuildFlags.SANITIZE_COVERAGE)
value = clickhouse_execute(
args, "SELECT value FROM system.build_options WHERE name = 'BUILD_TYPE'"
@ -2072,6 +2105,8 @@ def reportCoverageFor(args, what, query, permissive=False):
return True
# This is high-level coverage on per-component basis (functions, data types, etc.)
# Don't be confused with the code coverage.
def reportCoverage(args):
clickhouse_execute(args, "SYSTEM FLUSH LOGS")
@ -2334,6 +2369,28 @@ def main(args):
print(f"Failed to create databases for tests: {e}")
server_died.set()
if (
args.collect_per_test_coverage
and BuildFlags.SANITIZE_COVERAGE in args.build_flags
):
clickhouse_execute(
args,
"""
CREATE TABLE IF NOT EXISTS system.coverage
(
time DateTime,
test_name String,
coverage Array(UInt64)
) ENGINE = MergeTree ORDER BY test_name;
""",
)
# Coverage collected at the system startup before running any tests:
clickhouse_execute(
args,
"INSERT INTO system.coverage SELECT now(), '', coverage()",
)
total_tests_run = 0
for suite in sorted(os.listdir(base_dir), key=suite_key_func):
@ -2678,6 +2735,12 @@ def parse_args():
default=False,
help="Check what high-level server components were covered by tests",
)
parser.add_argument(
"--collect-per-test-coverage",
action="store_true",
default=False,
help="Create `system.coverage` table on the server and collect information about low-level code coverage on a per test basis there",
)
parser.add_argument(
"--report-logs-stats",
action="store_true",

View File

@ -27,10 +27,6 @@
<query>SELECT avg(num_f) FROM perf_avg FORMAT Null</query>
<query>SELECT avgWeighted(num_f, num) FROM perf_avg FORMAT Null</query>
<query>SELECT avgWeighted(num_f, num_f) FROM perf_avg FORMAT Null</query>
<query>SELECT avgWeighted(num_f, num_u) FROM perf_avg FORMAT Null</query>
<query>SELECT avgWeighted(num_u, num_f) FROM perf_avg FORMAT Null</query>
<query>SELECT avgWeighted(num_u, num) FROM perf_avg FORMAT Null</query>
<query>SELECT avgWeighted(num_u, num_u) FROM perf_avg FORMAT Null</query>
<query>SELECT avgWeighted(num_f, num_f) FROM perf_avg FORMAT Null</query>
<query>SELECT avgWeighted(toNullable(num_f), num_f) FROM perf_avg FORMAT Null</query>

View File

@ -0,0 +1,40 @@
#!/usr/bin/env bash
# Tags: no-random-merge-tree-settings
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
$CLICKHOUSE_CLIENT -nm -q "
CREATE TABLE IF NOT EXISTS ts_data_double_raw
(
device_id UInt32 NOT NULL CODEC(ZSTD),
data_item_id UInt32 NOT NULL CODEC(ZSTD),
data_time DateTime64(3, 'UTC') NOT NULL CODEC(Delta, ZSTD),
data_value Float64 NOT NULL CODEC(Delta, ZSTD),
is_deleted Bool CODEC(ZSTD),
ingestion_time DateTime64(3, 'UTC') NOT NULL CODEC(Delta, ZSTD)
)
ENGINE = ReplacingMergeTree
PARTITION BY toYYYYMM(data_time)
ORDER BY (device_id, data_item_id, data_time)
SETTINGS index_granularity = 8192;
CREATE VIEW ts_data_double AS
SELECT
device_id,
data_item_id,
data_time,
argMax(data_value, ingestion_time) data_value,
max(ingestion_time) version,
argMax(is_deleted, ingestion_time) is_deleted
FROM ts_data_double_raw
GROUP BY device_id, data_item_id, data_time
HAVING is_deleted = 0;
INSERT INTO ts_data_double_raw VALUES (100, 1, fromUnixTimestamp64Milli(1697547086760), 3.6, false, fromUnixTimestamp64Milli(1)), (100, 1, fromUnixTimestamp64Milli(1697547086761), 4.6, false, fromUnixTimestamp64Milli(1));
INSERT INTO ts_data_double_raw VALUES (100, 1, fromUnixTimestamp64Milli(1697547086760), 3.6, true, fromUnixTimestamp64Milli(5)), (100, 1, fromUnixTimestamp64Milli(1697547086761), 4.6, false, fromUnixTimestamp64Milli(4));
"
$CLICKHOUSE_CLIENT -q "select 1697547086760 format RowBinary" | ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&query=INSERT%20INTO%20ts_data_double_raw%20%28device_id%2C%20data_item_id%2C%20data_time%2C%20data_value%2C%20is_deleted%2C%20ingestion_time%29%0ASELECT%0A%20%20%20device_id%2C%0A%20%20%20data_item_id%2C%0A%20%20%20data_time%2C%0A%20%20%20data_value%2C%0A%20%20%201%2C%20%20--%20mark%20as%20deleted%0A%20%20%20fromUnixTimestamp64Milli%281697547088995%2C%20%27UTC%27%29%20--%20all%20inserted%20records%20have%20new%20ingestion%20time%0AFROM%20ts_data_double%0AWHERE%20%28device_id%20%3D%20100%29%20AND%20%28data_item_id%20%3D%201%29%0A%20%20%20%20AND%20%28data_time%20%3E%3D%20fromUnixTimestamp64Milli%280%2C%20%27UTC%27%29%29%0A%20%20%20%20AND%20%28data_time%20%3C%3D%20fromUnixTimestamp64Milli%281697547086764%2C%20%27UTC%27%29%29%0A%20%20%20%20AND%20version%20%3C%20fromUnixTimestamp64Milli%281697547088995%2C%20%27UTC%27%29%0A%20%20%20%20AND%20%28toUnixTimestamp64Milli%28data_time%29%20IN%20%28SELECT%20timestamp%20FROM%20input%28%27timestamp%20UInt64%27%29%29%29%20SETTINGS%20insert_quorum%3D1%0A%20FORMAT%20RowBinary" --data-binary @-