Merge branch 'rewrite_array_exists' of https://github.com/bigo-sg/ClickHouse into rewrite_array_exists

This commit is contained in:
taiyang-li 2023-02-10 11:32:56 +08:00
commit 5c1425b322
23 changed files with 221 additions and 38 deletions

View File

@ -433,6 +433,11 @@ else()
link_libraries(global-group)
endif ()
option (ENABLE_GWP_ASAN "Enable Gwp-Asan" ON)
if (NOT OS_LINUX AND NOT OS_ANDROID)
set(ENABLE_GWP_ASAN OFF)
endif ()
option(WERROR "Enable -Werror compiler option" ON)
if (WERROR)

View File

@ -114,6 +114,7 @@ endif()
add_contrib (llvm-project-cmake llvm-project)
add_contrib (libfuzzer-cmake llvm-project)
add_contrib (gwpasan-cmake llvm-project)
add_contrib (libxml2-cmake libxml2)
add_contrib (aws-cmake

View File

@ -0,0 +1,24 @@
if (NOT ENABLE_GWP_ASAN)
message (STATUS "Not using gwp-asan")
return ()
endif ()
set(COMPILER_RT_GWP_ASAN_SRC_DIR "${ClickHouse_SOURCE_DIR}/contrib/llvm-project/compiler-rt/lib/gwp_asan")
set(GWP_ASAN_SOURCES
${COMPILER_RT_GWP_ASAN_SRC_DIR}/common.cpp
${COMPILER_RT_GWP_ASAN_SRC_DIR}/crash_handler.cpp
${COMPILER_RT_GWP_ASAN_SRC_DIR}/platform_specific/common_posix.cpp
${COMPILER_RT_GWP_ASAN_SRC_DIR}/platform_specific/guarded_pool_allocator_posix.cpp
${COMPILER_RT_GWP_ASAN_SRC_DIR}/platform_specific/mutex_posix.cpp
${COMPILER_RT_GWP_ASAN_SRC_DIR}/platform_specific/utilities_posix.cpp
${COMPILER_RT_GWP_ASAN_SRC_DIR}/guarded_pool_allocator.cpp
${COMPILER_RT_GWP_ASAN_SRC_DIR}/stack_trace_compressor.cpp
${COMPILER_RT_GWP_ASAN_SRC_DIR}/optional/options_parser.cpp
)
set(GWP_ASAN_HEADERS "${ClickHouse_SOURCE_DIR}/contrib/llvm-project/compiler-rt/lib")
add_library(_gwp_asan ${GWP_ASAN_SOURCES})
target_include_directories (_gwp_asan SYSTEM PUBLIC ${GWP_ASAN_HEADERS})
add_library(ch_contrib::gwp_asan ALIAS _gwp_asan)

View File

@ -3310,6 +3310,15 @@ SELECT
FROM fuse_tbl
```
## optimize_rewrite_aggregate_function_with_if
Rewrite aggregate functions with if expression as argument when logically equivalent.
For example, `avg(if(cond, col, null))` can be rewritten to `avgOrNullIf(cond, col)`. It may improve performance.
:::note
Supported only with experimental analyzer (`allow_experimental_analyzer = 1`).
:::
## allow_experimental_database_replicated {#allow_experimental_database_replicated}
Enables to create databases with [Replicated](../../engines/database-engines/replicated.md) engine.

View File

@ -433,7 +433,6 @@ extern "C"
}
#endif
/// This allows to implement assert to forbid initialization of a class in static constructors.
/// Usage:
///

View File

@ -304,6 +304,11 @@ if (TARGET ch_contrib::llvm)
dbms_target_link_libraries (PUBLIC ch_contrib::llvm)
endif ()
if (TARGET ch_contrib::gwp_asan)
target_link_libraries (clickhouse_common_io PRIVATE ch_contrib::gwp_asan)
target_link_libraries (clickhouse_new_delete PRIVATE ch_contrib::gwp_asan)
endif()
# Otherwise it will slow down stack traces printing too much.
set_source_files_properties(
Common/Elf.cpp

View File

@ -157,6 +157,19 @@ static void deleteAttributesRecursive(Node * root)
}
}
static void mergeAttributes(Element & config_element, Element & with_element)
{
auto * with_element_attributes = with_element.attributes();
for (size_t i = 0; i < with_element_attributes->length(); ++i)
{
auto * attr = with_element_attributes->item(i);
config_element.setAttribute(attr->nodeName(), attr->getNodeValue());
}
with_element_attributes->release();
}
void ConfigProcessor::mergeRecursive(XMLDocumentPtr config, Node * config_root, const Node * with_root)
{
const NodeListPtr with_nodes = with_root->childNodes();
@ -211,6 +224,9 @@ void ConfigProcessor::mergeRecursive(XMLDocumentPtr config, Node * config_root,
}
else
{
Element & config_element = dynamic_cast<Element &>(*config_node);
mergeAttributes(config_element, with_element);
mergeRecursive(config, config_node, with_node);
}
merged = true;

View File

@ -24,6 +24,7 @@
#cmakedefine01 USE_ODBC
#cmakedefine01 USE_REPLXX
#cmakedefine01 USE_JEMALLOC
#cmakedefine01 USE_GWP_ASAN
#cmakedefine01 USE_H3
#cmakedefine01 USE_S2_GEOMETRY
#cmakedefine01 USE_FASTOPS

View File

@ -17,6 +17,12 @@
# include <cstdlib>
#endif
#if USE_GWP_ASAN
# include <gwp_asan/guarded_pool_allocator.h>
static gwp_asan::GuardedPoolAllocator GuardedAlloc;
#endif
namespace Memory
{
@ -29,6 +35,23 @@ template <std::same_as<std::align_val_t>... TAlign>
requires DB::OptionalArgument<TAlign...>
inline ALWAYS_INLINE void * newImpl(std::size_t size, TAlign... align)
{
#if USE_GWP_ASAN
if (unlikely(GuardedAlloc.shouldSample()))
{
if constexpr (sizeof...(TAlign) == 1)
{
if (void * ptr = GuardedAlloc.allocate(size, alignToSizeT(align...)))
return ptr;
}
else
{
if (void * ptr = GuardedAlloc.allocate(size))
return ptr;
}
}
#endif
void * ptr = nullptr;
if constexpr (sizeof...(TAlign) == 1)
ptr = aligned_alloc(alignToSizeT(align...), size);
@ -44,16 +67,37 @@ inline ALWAYS_INLINE void * newImpl(std::size_t size, TAlign... align)
inline ALWAYS_INLINE void * newNoExept(std::size_t size) noexcept
{
#if USE_GWP_ASAN
if (unlikely(GuardedAlloc.shouldSample()))
{
if (void * ptr = GuardedAlloc.allocate(size))
return ptr;
}
#endif
return malloc(size);
}
inline ALWAYS_INLINE void * newNoExept(std::size_t size, std::align_val_t align) noexcept
{
#if USE_GWP_ASAN
if (unlikely(GuardedAlloc.shouldSample()))
{
if (void * ptr = GuardedAlloc.allocate(size, alignToSizeT(align)))
return ptr;
}
#endif
return aligned_alloc(static_cast<size_t>(align), size);
}
inline ALWAYS_INLINE void deleteImpl(void * ptr) noexcept
{
#if USE_GWP_ASAN
if (unlikely(GuardedAlloc.pointerIsMine(ptr)))
{
GuardedAlloc.deallocate(ptr);
return;
}
#endif
free(ptr);
}
@ -66,6 +110,14 @@ inline ALWAYS_INLINE void deleteSized(void * ptr, std::size_t size, TAlign... al
if (unlikely(ptr == nullptr))
return;
#if USE_GWP_ASAN
if (unlikely(GuardedAlloc.pointerIsMine(ptr)))
{
GuardedAlloc.deallocate(ptr);
return;
}
#endif
if constexpr (sizeof...(TAlign) == 1)
sdallocx(ptr, size, MALLOCX_ALIGN(alignToSizeT(align...)));
else
@ -78,6 +130,13 @@ template <std::same_as<std::align_val_t>... TAlign>
requires DB::OptionalArgument<TAlign...>
inline ALWAYS_INLINE void deleteSized(void * ptr, std::size_t size [[maybe_unused]], TAlign... /* align */) noexcept
{
#if USE_GWP_ASAN
if (unlikely(GuardedAlloc.pointerIsMine(ptr)))
{
GuardedAlloc.deallocate(ptr);
return;
}
#endif
free(ptr);
}
@ -122,6 +181,16 @@ template <std::same_as<std::align_val_t>... TAlign>
requires DB::OptionalArgument<TAlign...>
inline ALWAYS_INLINE void untrackMemory(void * ptr [[maybe_unused]], std::size_t size [[maybe_unused]] = 0, TAlign... align [[maybe_unused]]) noexcept
{
#if USE_GWP_ASAN
if (unlikely(GuardedAlloc.pointerIsMine(ptr)))
{
if (!size)
size = GuardedAlloc.getSize(ptr);
CurrentMemoryTracker::free(size);
return;
}
#endif
try
{
#if USE_JEMALLOC

View File

@ -1,4 +1,5 @@
#include <cassert>
#include <iostream>
#include <new>
#include "config.h"
#include <Common/memory.h>
@ -41,6 +42,26 @@ static struct InitializeJemallocZoneAllocatorForOSX
} initializeJemallocZoneAllocatorForOSX;
#endif
#if USE_GWP_ASAN
#include <gwp_asan/optional/options_parser.h>
/// Both clickhouse_new_delete and clickhouse_common_io links gwp_asan, but It should only init once, otherwise it
/// will cause unexpected deadlock.
static struct InitGwpAsan
{
InitGwpAsan()
{
gwp_asan::options::initOptions();
gwp_asan::options::Options &opts = gwp_asan::options::getOptions();
GuardedAlloc.init(opts);
///std::cerr << "GwpAsan is initialized, the options are { Enabled: " << opts.Enabled
/// << ", MaxSimultaneousAllocations: " << opts.MaxSimultaneousAllocations
/// << ", SampleRate: " << opts.SampleRate << " }\n";
}
} init_gwp_asan;
#endif
/// Replace default new/delete with memory tracking versions.
/// @sa https://en.cppreference.com/w/cpp/memory/new/operator_new

View File

@ -149,13 +149,13 @@ std::unique_ptr<ReadBufferFromFileBase> createReadBufferFromFileBase(
if (buffer_size % min_alignment)
{
existing_memory = nullptr; /// Cannot reuse existing memory is it has unaligned size.
existing_memory = nullptr; /// Cannot reuse existing memory as it has unaligned size.
buffer_size = align_up(buffer_size);
}
if (reinterpret_cast<uintptr_t>(existing_memory) % min_alignment)
{
existing_memory = nullptr; /// Cannot reuse existing memory is it has unaligned offset.
existing_memory = nullptr; /// Cannot reuse existing memory as it has unaligned offset.
}
/// Attempt to open a file with O_DIRECT

View File

@ -886,8 +886,8 @@ void InterpreterSystemQuery::syncReplica()
if (auto * storage_replicated = dynamic_cast<StorageReplicatedMergeTree *>(table.get()))
{
LOG_TRACE(log, "Synchronizing entries in replica's queue with table's log and waiting for it to become empty");
if (!storage_replicated->waitForShrinkingQueueSize(0, getContext()->getSettingsRef().receive_timeout.totalMilliseconds()))
LOG_TRACE(log, "Synchronizing entries in replica's queue with table's log and waiting for current last entry to be processed");
if (!storage_replicated->waitForProcessingQueue(getContext()->getSettingsRef().receive_timeout.totalMilliseconds()))
{
LOG_ERROR(log, "SYNC REPLICA {}: Timed out!", table_id.getNameForLogs());
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "SYNC REPLICA {}: command timed out. " \

View File

@ -117,7 +117,7 @@ bool PullingAsyncPipelineExecutor::pull(Chunk & chunk, uint64_t milliseconds)
data->rethrowExceptionIfHas();
bool is_execution_finished
= !data->executor->checkTimeLimitSoft() || lazy_format ? lazy_format->isFinished() : data->is_finished.load();
= !data->executor->checkTimeLimitSoft() || (lazy_format ? lazy_format->isFinished() : data->is_finished.load());
if (is_execution_finished)
{

View File

@ -544,8 +544,7 @@ void ReplicatedMergeTreeQueue::removeProcessedEntry(zkutil::ZooKeeperPtr zookeep
if (!found && need_remove_from_zk)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find {} in the memory queue. It is a bug. Entry: {}",
entry->znode_name, entry->toString());
notifySubscribers(queue_size);
notifySubscribers(queue_size, entry->znode_name);
if (!need_remove_from_zk)
return;
@ -2466,12 +2465,17 @@ ReplicatedMergeTreeQueue::SubscriberHandler
ReplicatedMergeTreeQueue::addSubscriber(ReplicatedMergeTreeQueue::SubscriberCallBack && callback)
{
std::lock_guard lock(state_mutex);
std::unordered_set<String> result;
result.reserve(queue.size());
for (const auto & entry : queue)
result.insert(entry->znode_name);
std::lock_guard lock_subscribers(subscribers_mutex);
auto it = subscribers.emplace(subscribers.end(), std::move(callback));
/// Atomically notify about current size
(*it)(queue.size());
/// Notify queue size & log entry ids to avoid waiting for removed entries
(*it)(result.size(), result, std::nullopt);
return SubscriberHandler(it, *this);
}
@ -2482,16 +2486,16 @@ ReplicatedMergeTreeQueue::SubscriberHandler::~SubscriberHandler()
queue.subscribers.erase(it);
}
void ReplicatedMergeTreeQueue::notifySubscribers(size_t new_queue_size)
void ReplicatedMergeTreeQueue::notifySubscribers(size_t new_queue_size, std::optional<String> removed_log_entry_id)
{
std::lock_guard lock_subscribers(subscribers_mutex);
for (auto & subscriber_callback : subscribers)
subscriber_callback(new_queue_size);
subscriber_callback(new_queue_size, {}, removed_log_entry_id);
}
ReplicatedMergeTreeQueue::~ReplicatedMergeTreeQueue()
{
notifySubscribers(0);
notifySubscribers(0, std::nullopt);
}
String padIndex(Int64 index)

View File

@ -163,7 +163,7 @@ private:
/// A subscriber callback is called when an entry queue is deleted
mutable std::mutex subscribers_mutex;
using SubscriberCallBack = std::function<void(size_t /* queue_size */)>;
using SubscriberCallBack = std::function<void(size_t /* queue_size */, std::unordered_set<String> /*wait_for_ids*/, std::optional<String> /* removed_log_entry_id */)>;
using Subscribers = std::list<SubscriberCallBack>;
using SubscriberIterator = Subscribers::iterator;
@ -180,8 +180,8 @@ private:
Subscribers subscribers;
/// Notify subscribers about queue change
void notifySubscribers(size_t new_queue_size);
/// Notify subscribers about queue change (new queue size and entry that was removed)
void notifySubscribers(size_t new_queue_size, std::optional<String> removed_log_entry_id);
/// Check that entry_ptr is REPLACE_RANGE entry and can be removed from queue because current entry covers it
bool checkReplaceRangeCanBeRemoved(

View File

@ -7549,26 +7549,37 @@ void StorageReplicatedMergeTree::onActionLockRemove(StorageActionBlockType actio
background_moves_assignee.trigger();
}
bool StorageReplicatedMergeTree::waitForShrinkingQueueSize(size_t queue_size, UInt64 max_wait_milliseconds)
bool StorageReplicatedMergeTree::waitForProcessingQueue(UInt64 max_wait_milliseconds)
{
Stopwatch watch;
/// Let's fetch new log entries firstly
queue.pullLogsToQueue(getZooKeeperAndAssertNotReadonly(), {}, ReplicatedMergeTreeQueue::SYNC);
/// This is significant, because the execution of this task could be delayed at BackgroundPool.
/// And we force it to be executed.
background_operations_assignee.trigger();
Poco::Event target_size_event;
auto callback = [&target_size_event, queue_size] (size_t new_queue_size)
std::unordered_set<String> wait_for_ids;
bool set_ids_to_wait = true;
Poco::Event target_entry_event;
auto callback = [&target_entry_event, &wait_for_ids, &set_ids_to_wait](size_t new_queue_size, std::unordered_set<String> log_entry_ids, std::optional<String> removed_log_entry_id)
{
if (new_queue_size <= queue_size)
target_size_event.set();
if (set_ids_to_wait)
{
wait_for_ids = log_entry_ids;
set_ids_to_wait = false;
}
if (removed_log_entry_id.has_value())
wait_for_ids.erase(removed_log_entry_id.value());
if (wait_for_ids.empty() || new_queue_size == 0)
target_entry_event.set();
};
const auto handler = queue.addSubscriber(std::move(callback));
while (!target_size_event.tryWait(50))
while (!target_entry_event.tryWait(50))
{
if (max_wait_milliseconds && watch.elapsedMilliseconds() > max_wait_milliseconds)
return false;
@ -7576,7 +7587,6 @@ bool StorageReplicatedMergeTree::waitForShrinkingQueueSize(size_t queue_size, UI
if (partial_shutdown_called)
throw Exception(ErrorCodes::ABORTED, "Shutdown is called for table");
}
return true;
}

View File

@ -178,9 +178,9 @@ public:
void onActionLockRemove(StorageActionBlockType action_type) override;
/// Wait when replication queue size becomes less or equal than queue_size
/// Wait till replication queue's current last entry is processed or till size becomes 0
/// If timeout is exceeded returns false
bool waitForShrinkingQueueSize(size_t queue_size = 0, UInt64 max_wait_milliseconds = 0);
bool waitForProcessingQueue(UInt64 max_wait_milliseconds = 0);
/// Get the status of the table. If with_zk_fields = false - do not fill in the fields that require queries to ZK.
void getStatus(ReplicatedTableStatus & res, bool with_zk_fields = true);

View File

@ -109,6 +109,9 @@ endif()
if (TARGET ch_contrib::jemalloc)
set(USE_JEMALLOC 1)
endif()
if (TARGET ch_contrib::gwp_asan)
set(USE_GWP_ASAN 1)
endif()
if (TARGET ch_contrib::h3)
set(USE_H3 1)
endif()

View File

@ -56,6 +56,13 @@ class Reviews:
logging.info("There aren't reviews for PR #%s", self.pr.number)
return False
logging.info(
"The following users have reviewed the PR:\n %s",
"\n ".join(
f"{user.login}: {review.state}" for user, review in self.reviews.items()
),
)
filtered_reviews = {
user: review
for user, review in self.reviews.items()
@ -125,7 +132,11 @@ class Reviews:
return False
return True
logging.info("The PR #%s is not approved", self.pr.number)
logging.info(
"The PR #%s is not approved by any of %s team member",
self.pr.number,
TEAM_NAME,
)
return False

View File

@ -91,7 +91,7 @@ class HTTPError(Exception):
# Helpers to execute queries via HTTP interface.
def clickhouse_execute_http(
base_args, query, timeout=30, settings=None, default_format=None
base_args, query, timeout=30, settings=None, default_format=None, max_http_retries=5
):
if args.secure:
client = http.client.HTTPSConnection(
@ -120,7 +120,7 @@ def clickhouse_execute_http(
if default_format is not None:
params["default_format"] = default_format
for i in range(MAX_RETRIES):
for i in range(max_http_retries):
try:
client.request(
"POST",
@ -130,7 +130,7 @@ def clickhouse_execute_http(
data = res.read()
break
except Exception as ex:
if i == MAX_RETRIES - 1:
if i == max_http_retries - 1:
raise ex
sleep(i + 1)
@ -140,13 +140,12 @@ def clickhouse_execute_http(
return data
def clickhouse_execute(base_args, query, timeout=30, settings=None):
return clickhouse_execute_http(base_args, query, timeout, settings).strip()
def clickhouse_execute(base_args, query, timeout=30, settings=None, max_http_retries=5):
return clickhouse_execute_http(base_args, query, timeout, settings, max_http_retries=max_http_retries).strip()
def clickhouse_execute_json(base_args, query, timeout=60, settings=None):
data = clickhouse_execute_http(base_args, query, timeout, settings, "JSONEachRow")
def clickhouse_execute_json(base_args, query, timeout=60, settings=None, max_http_retries=5):
data = clickhouse_execute_http(base_args, query, timeout, settings, "JSONEachRow", max_http_retries=max_http_retries)
if not data:
return None
rows = []
@ -641,7 +640,7 @@ class TestCase:
clickhouse_execute(
args,
"CREATE DATABASE " + database + get_db_engine(testcase_args, database),
"CREATE DATABASE IF NOT EXISTS " + database + get_db_engine(testcase_args, database),
settings=get_create_database_settings(args, testcase_args),
)
@ -1139,7 +1138,7 @@ class TestCase:
seconds_left = max(
args.timeout - (datetime.now() - start_time).total_seconds(), 20
)
drop_database_query = "DROP DATABASE " + database
drop_database_query = "DROP DATABASE IF EXISTS " + database
if args.replicated_database:
drop_database_query += " ON CLUSTER test_cluster_database_replicated"
@ -1670,7 +1669,7 @@ def check_server_started(args):
retry_count = args.server_check_retries
while retry_count > 0:
try:
clickhouse_execute(args, "SELECT 1")
clickhouse_execute(args, "SELECT 1", max_http_retries=1)
print(" OK")
sys.stdout.flush()
return True

View File

@ -0,0 +1,2 @@
mark_cache_size:
'@from_env': CONFIG_TEST_ENV

View File

@ -271,4 +271,6 @@
<anonymize>false</anonymize>
<endpoint>https://6f33034cfe684dd7a3ab9875e57b1c8d@o388870.ingest.sentry.io/5226277</endpoint>
</send_crash_reports>
<mark_cache_size>123451234</mark_cache_size>
</clickhouse>

View File

@ -21,6 +21,7 @@ def test_extra_yaml_mix():
"configs/config.d/logging_no_rotate.xml",
"configs/config.d/log_to_console.yaml",
"configs/config.d/macros.yaml",
"configs/config.d/mark_cache_size.yaml",
"configs/config.d/metric_log.xml",
"configs/config.d/more_clusters.yaml",
"configs/config.d/part_log.xml",
@ -46,6 +47,7 @@ def test_extra_yaml_mix():
users_config_name="users.yaml",
copy_common_configs=False,
config_root_name="clickhouse",
env_variables={"CONFIG_TEST_ENV": "8956"},
)
try: