CLICKHOUSE-3800: Review fixes

This commit is contained in:
alesapin 2018-09-03 13:14:05 +03:00
parent 83574eed7c
commit a7bd2b42e8
10 changed files with 266 additions and 223 deletions

View File

@ -161,6 +161,29 @@ public:
virtual ~LRUCache() {}
protected:
using LRUQueue = std::list<Key>;
using LRUQueueIterator = typename LRUQueue::iterator;
struct Cell
{
bool expired(const Timestamp & last_timestamp, const Delay & delay) const
{
return (delay == Delay::zero()) ||
((last_timestamp > timestamp) && ((last_timestamp - timestamp) > delay));
}
MappedPtr value;
size_t size;
LRUQueueIterator queue_iterator;
Timestamp timestamp;
};
using Cells = std::unordered_map<Key, Cell, HashFunction>;
Cells cells;
mutable std::mutex mutex;
private:
/// Represents pending insertion attempt.
@ -226,36 +249,16 @@ private:
friend struct InsertTokenHolder;
using LRUQueue = std::list<Key>;
using LRUQueueIterator = typename LRUQueue::iterator;
struct Cell
{
bool expired(const Timestamp & last_timestamp, const Delay & expiration_delay) const
{
return (expiration_delay == Delay::zero()) ||
((last_timestamp > timestamp) && ((last_timestamp - timestamp) > expiration_delay));
}
MappedPtr value;
size_t size;
LRUQueueIterator queue_iterator;
Timestamp timestamp;
};
using Cells = std::unordered_map<Key, Cell, HashFunction>;
InsertTokenById insert_tokens;
LRUQueue queue;
Cells cells;
/// Total weight of values.
size_t current_size = 0;
const size_t max_size;
const Delay expiration_delay;
mutable std::mutex mutex;
std::atomic<size_t> hits {0};
std::atomic<size_t> misses {0};

View File

@ -5,178 +5,175 @@
/// Available events. Add something here as you wish.
#define APPLY_FOR_EVENTS(M) \
M(Query) \
M(SelectQuery) \
M(InsertQuery) \
M(FileOpen) \
M(FileOpenFailed) \
M(Seek) \
M(ReadBufferFromFileDescriptorRead) \
M(ReadBufferFromFileDescriptorReadFailed) \
M(ReadBufferFromFileDescriptorReadBytes) \
M(WriteBufferFromFileDescriptorWrite) \
M(WriteBufferFromFileDescriptorWriteFailed) \
M(WriteBufferFromFileDescriptorWriteBytes) \
M(ReadBufferAIORead) \
M(ReadBufferAIOReadBytes) \
M(WriteBufferAIOWrite) \
M(WriteBufferAIOWriteBytes) \
M(ReadCompressedBytes) \
M(CompressedReadBufferBlocks) \
M(CompressedReadBufferBytes) \
M(UncompressedCacheHits) \
M(UncompressedCacheMisses) \
M(UncompressedCacheWeightLost) \
M(IOBufferAllocs) \
M(IOBufferAllocBytes) \
M(ArenaAllocChunks) \
M(ArenaAllocBytes) \
M(FunctionExecute) \
M(TableFunctionExecute) \
M(MarkCacheHits) \
M(MarkCacheMisses) \
M(CreatedReadBufferOrdinary) \
M(CreatedReadBufferAIO) \
M(CreatedWriteBufferOrdinary) \
M(CreatedWriteBufferAIO) \
M(DiskReadElapsedMicroseconds) \
M(DiskWriteElapsedMicroseconds) \
M(NetworkReceiveElapsedMicroseconds) \
M(NetworkSendElapsedMicroseconds) \
M(ThrottlerSleepMicroseconds) \
M(Query, "Number of queries started to be interpreted and maybe executed. Does not include queries that are failed to parse, that are rejected due to AST size limits; rejected due to quota limits or limits on number of simultaneously running queries. May include internal queries initiated by ClickHouse itself. Does not count subqueries.") \
M(SelectQuery, "Same as Query, but only for SELECT queries.") \
M(InsertQuery, "Same as Query, but only for INSERT queries.") \
M(FileOpen, "Number of files opened.") \
M(Seek, "Number of times the 'lseek' function was called.") \
M(ReadBufferFromFileDescriptorRead, "Number of reads (read/pread) from a file descriptor. Does not include sockets.") \
M(ReadBufferFromFileDescriptorReadFailed, "Number of times the read (read/pread) from a file descriptor have failed.") \
M(ReadBufferFromFileDescriptorReadBytes, "Number of bytes read from file descriptors. If the file is compressed, this will show compressed data size.") \
M(WriteBufferFromFileDescriptorWrite, "Number of writes (write/pwrite) to a file descriptor. Does not include sockets.") \
M(WriteBufferFromFileDescriptorWriteFailed, "Number of times the write (write/pwrite) to a file descriptor have failed.") \
M(WriteBufferFromFileDescriptorWriteBytes, "Number of bytes written to file descriptors. If the file is compressed, this will show compressed data size.") \
M(ReadBufferAIORead, "") \
M(ReadBufferAIOReadBytes, "") \
M(WriteBufferAIOWrite, "") \
M(WriteBufferAIOWriteBytes, "") \
M(ReadCompressedBytes, "") \
M(CompressedReadBufferBlocks, "") \
M(CompressedReadBufferBytes, "") \
M(UncompressedCacheHits, "") \
M(UncompressedCacheMisses, "") \
M(UncompressedCacheWeightLost, "") \
M(IOBufferAllocs, "") \
M(IOBufferAllocBytes, "") \
M(ArenaAllocChunks, "") \
M(ArenaAllocBytes, "") \
M(FunctionExecute, "") \
M(TableFunctionExecute, "") \
M(MarkCacheHits, "") \
M(MarkCacheMisses, "") \
M(CreatedReadBufferOrdinary, "") \
M(CreatedReadBufferAIO, "") \
M(CreatedWriteBufferOrdinary, "") \
M(CreatedWriteBufferAIO, "") \
M(DiskReadElapsedMicroseconds, "Total time spent waiting for read syscall. This include reads from page cache.") \
M(DiskWriteElapsedMicroseconds, "Total time spent waiting for write syscall. This include writes to page cache.") \
M(NetworkReceiveElapsedMicroseconds, "") \
M(NetworkSendElapsedMicroseconds, "") \
M(ThrottlerSleepMicroseconds, "Total time a query was sleeping to conform the 'max_network_bandwidth' setting.") \
\
M(ReplicatedPartFetches) \
M(ReplicatedPartFailedFetches) \
M(ObsoleteReplicatedParts) \
M(ReplicatedPartMerges) \
M(ReplicatedPartFetchesOfMerged) \
M(ReplicatedPartMutations) \
M(ReplicatedPartChecks) \
M(ReplicatedPartChecksFailed) \
M(ReplicatedDataLoss) \
M(ReplicatedPartFetches, "Number of times a data part was downloaded from replica of a ReplicatedMergeTree table.") \
M(ReplicatedPartFailedFetches, "") \
M(ObsoleteReplicatedParts, "") \
M(ReplicatedPartMerges, "") \
M(ReplicatedPartFetchesOfMerged, "Number of times we prefer to download already merged part from replica of ReplicatedMergeTree table instead of performing a merge ourself (usually we prefer doing a merge ourself to save network traffic). This happens when we have not all source parts to perform a merge or when the data part is old enough.") \
M(ReplicatedPartMutations, "") \
M(ReplicatedPartChecks, "") \
M(ReplicatedPartChecksFailed, "") \
M(ReplicatedDataLoss, "Number of times a data part that we wanted doesn't exist on any replica (even on replicas that are offline right now). That data parts are definitely lost. This is normal due to asynchronous replication (if quorum inserts were not enabled), when the replica on which the data part was written was failed and when it became online after fail it doesn't contain that data part.") \
\
M(InsertedRows) \
M(InsertedBytes) \
M(DelayedInserts) \
M(RejectedInserts) \
M(DelayedInsertsMilliseconds) \
M(DuplicatedInsertedBlocks) \
M(InsertedRows, "Number of rows INSERTed to all tables.") \
M(InsertedBytes, "Number of bytes (uncompressed; for columns as they stored in memory) INSERTed to all tables.") \
M(DelayedInserts, "Number of times the INSERT of a block to a MergeTree table was throttled due to high number of active data parts for partition.") \
M(RejectedInserts, "Number of times the INSERT of a block to a MergeTree table was rejected with 'Too many parts' exception due to high number of active data parts for partition.") \
M(DelayedInsertsMilliseconds, "Total number of milliseconds spent while the INSERT of a block to a MergeTree table was throttled due to high number of active data parts for partition.") \
M(DuplicatedInsertedBlocks, "Number of times the INSERTed block to a ReplicatedMergeTree table was deduplicated.") \
\
M(ZooKeeperInit) \
M(ZooKeeperTransactions) \
M(ZooKeeperList) \
M(ZooKeeperCreate) \
M(ZooKeeperRemove) \
M(ZooKeeperExists) \
M(ZooKeeperGet) \
M(ZooKeeperSet) \
M(ZooKeeperMulti) \
M(ZooKeeperCheck) \
M(ZooKeeperClose) \
M(ZooKeeperWatchResponse) \
M(ZooKeeperUserExceptions) \
M(ZooKeeperHardwareExceptions) \
M(ZooKeeperOtherExceptions) \
M(ZooKeeperWaitMicroseconds) \
M(ZooKeeperBytesSent) \
M(ZooKeeperBytesReceived) \
M(ZooKeeperInit, "") \
M(ZooKeeperTransactions, "") \
M(ZooKeeperList, "") \
M(ZooKeeperCreate, "") \
M(ZooKeeperRemove, "") \
M(ZooKeeperExists, "") \
M(ZooKeeperGet, "") \
M(ZooKeeperSet, "") \
M(ZooKeeperMulti, "") \
M(ZooKeeperCheck, "") \
M(ZooKeeperClose, "") \
M(ZooKeeperWatchResponse, "") \
M(ZooKeeperUserExceptions, "") \
M(ZooKeeperHardwareExceptions, "") \
M(ZooKeeperOtherExceptions, "") \
M(ZooKeeperWaitMicroseconds, "") \
M(ZooKeeperBytesSent, "") \
M(ZooKeeperBytesReceived, "") \
\
M(DistributedConnectionFailTry) \
M(DistributedConnectionMissingTable) \
M(DistributedConnectionStaleReplica) \
M(DistributedConnectionFailAtAll) \
M(DistributedConnectionFailTry, "") \
M(DistributedConnectionMissingTable, "") \
M(DistributedConnectionStaleReplica, "") \
M(DistributedConnectionFailAtAll, "") \
\
M(CompileAttempt) \
M(CompileSuccess) \
M(CompileAttempt, "Number of times a compilation of generated C++ code was initiated.") \
M(CompileSuccess, "Number of times a compilation of generated C++ code was successful.") \
\
M(CompileFunction) \
M(CompiledFunctionExecute) \
M(CompiledCacheSizeBytes) \
M(CompileFunction, "Number of times a compilation of generated LLVM code (to create fused function for complex expressions) was initiated.") \
M(CompiledFunctionExecute, "Number of times a compiled function was executed.") \
M(CompileExpressionsMicroseconds, "Total time spent for compilation of expressions to LLVM code.") \
\
M(ExternalSortWritePart) \
M(ExternalSortMerge) \
M(ExternalAggregationWritePart) \
M(ExternalAggregationMerge) \
M(ExternalAggregationCompressedBytes) \
M(ExternalAggregationUncompressedBytes) \
M(ExternalSortWritePart, "") \
M(ExternalSortMerge, "") \
M(ExternalAggregationWritePart, "") \
M(ExternalAggregationMerge, "") \
M(ExternalAggregationCompressedBytes, "") \
M(ExternalAggregationUncompressedBytes, "") \
\
M(SlowRead) \
M(ReadBackoff) \
M(SlowRead, "Number of reads from a file that were slow. This indicate system overload. Thresholds are controlled by read_backoff_* settings.") \
M(ReadBackoff, "Number of times the number of query processing threads was lowered due to slow reads.") \
\
M(ReplicaYieldLeadership) \
M(ReplicaPartialShutdown) \
M(ReplicaYieldLeadership, "Number of times Replicated table was yielded its leadership due to large replication lag relative to other replicas.") \
M(ReplicaPartialShutdown, "") \
\
M(SelectedParts) \
M(SelectedRanges) \
M(SelectedMarks) \
M(SelectedParts, "Number of data parts selected to read from a MergeTree table.") \
M(SelectedRanges, "Number of (non-adjacent) ranges in all data parts selected to read from a MergeTree table.") \
M(SelectedMarks, "Number of marks (index granules) selected to read from a MergeTree table.") \
\
M(MergedRows) \
M(MergedUncompressedBytes) \
M(MergesTimeMilliseconds)\
M(MergedRows, "Rows read for background merges. This is the number of rows before merge.") \
M(MergedUncompressedBytes, "Uncompressed bytes (for columns as they stored in memory) that was read for background merges. This is the number before merge.") \
M(MergesTimeMilliseconds, "Total time spent for background merges.")\
\
M(MergeTreeDataWriterRows) \
M(MergeTreeDataWriterUncompressedBytes) \
M(MergeTreeDataWriterCompressedBytes) \
M(MergeTreeDataWriterBlocks) \
M(MergeTreeDataWriterBlocksAlreadySorted) \
M(MergeTreeDataWriterRows, "Number of rows INSERTed to MergeTree tables.") \
M(MergeTreeDataWriterUncompressedBytes, "Uncompressed bytes (for columns as they stored in memory) INSERTed to MergeTree tables.") \
M(MergeTreeDataWriterCompressedBytes, "Bytes written to filesystem for data INSERTed to MergeTree tables.") \
M(MergeTreeDataWriterBlocks, "Number of blocks INSERTed to MergeTree tables. Each block forms a data part of level zero.") \
M(MergeTreeDataWriterBlocksAlreadySorted, "Number of blocks INSERTed to MergeTree tables that appeared to be already sorted.") \
\
M(ObsoleteEphemeralNode) \
M(CannotRemoveEphemeralNode) \
M(LeaderElectionAcquiredLeadership) \
M(CannotRemoveEphemeralNode, "Number of times an error happened while trying to remove ephemeral node. This is not an issue, because our implementation of ZooKeeper library guarantee that the session will expire and the node will be removed.") \
M(LeaderElectionAcquiredLeadership, "Number of times a ReplicatedMergeTree table became a leader. Leader replica is responsible for assigning merges, cleaning old blocks for deduplications and a few more bookkeeping tasks.") \
\
M(RegexpCreated) \
M(ContextLock) \
M(RegexpCreated, "Compiled regular expressions. Identical regular expressions compiled just once and cached forever.") \
M(ContextLock, "Number of times the lock of Context was acquired or tried to acquire. This is global lock.") \
\
M(StorageBufferFlush) \
M(StorageBufferErrorOnFlush) \
M(StorageBufferPassedAllMinThresholds) \
M(StorageBufferPassedTimeMaxThreshold) \
M(StorageBufferPassedRowsMaxThreshold) \
M(StorageBufferPassedBytesMaxThreshold) \
M(StorageBufferFlush, "") \
M(StorageBufferErrorOnFlush, "") \
M(StorageBufferPassedAllMinThresholds, "") \
M(StorageBufferPassedTimeMaxThreshold, "") \
M(StorageBufferPassedRowsMaxThreshold, "") \
M(StorageBufferPassedBytesMaxThreshold, "") \
\
M(DictCacheKeysRequested) \
M(DictCacheKeysRequestedMiss) \
M(DictCacheKeysRequestedFound) \
M(DictCacheKeysExpired) \
M(DictCacheKeysNotFound) \
M(DictCacheKeysHit) \
M(DictCacheRequestTimeNs) \
M(DictCacheRequests) \
M(DictCacheLockWriteNs) \
M(DictCacheLockReadNs) \
M(DictCacheKeysRequested, "") \
M(DictCacheKeysRequestedMiss, "") \
M(DictCacheKeysRequestedFound, "") \
M(DictCacheKeysExpired, "") \
M(DictCacheKeysNotFound, "") \
M(DictCacheKeysHit, "") \
M(DictCacheRequestTimeNs, "") \
M(DictCacheRequests, "") \
M(DictCacheLockWriteNs, "") \
M(DictCacheLockReadNs, "") \
\
M(DistributedSyncInsertionTimeoutExceeded) \
M(DataAfterMergeDiffersFromReplica) \
M(DataAfterMutationDiffersFromReplica) \
M(PolygonsAddedToPool) \
M(PolygonsInPoolAllocatedBytes) \
M(RWLockAcquiredReadLocks) \
M(RWLockAcquiredWriteLocks) \
M(RWLockReadersWaitMilliseconds) \
M(RWLockWritersWaitMilliseconds) \
M(NetworkErrors) \
M(DistributedSyncInsertionTimeoutExceeded, "") \
M(DataAfterMergeDiffersFromReplica, "") \
M(DataAfterMutationDiffersFromReplica, "") \
M(PolygonsAddedToPool, "") \
M(PolygonsInPoolAllocatedBytes, "") \
M(RWLockAcquiredReadLocks, "") \
M(RWLockAcquiredWriteLocks, "") \
M(RWLockReadersWaitMilliseconds, "") \
M(RWLockWritersWaitMilliseconds, "") \
M(NetworkErrors, "") \
\
M(RealTimeMicroseconds) \
M(UserTimeMicroseconds) \
M(SystemTimeMicroseconds) \
M(SoftPageFaults) \
M(HardPageFaults) \
M(VoluntaryContextSwitches) \
M(InvoluntaryContextSwitches) \
M(RealTimeMicroseconds, "Total (wall clock) time spent in processing (queries and other tasks) threads (not that this is a sum).") \
M(UserTimeMicroseconds, "Total time spent in processing (queries and other tasks) threads executing CPU instructions in user space. This include time CPU pipeline was stalled due to cache misses, branch mispredictions, hyper-threading, etc.") \
M(SystemTimeMicroseconds, "Total time spent in processing (queries and other tasks) threads executing CPU instructions in OS kernel space. This include time CPU pipeline was stalled due to cache misses, branch mispredictions, hyper-threading, etc.") \
M(SoftPageFaults, "") \
M(HardPageFaults, "") \
M(VoluntaryContextSwitches, "") \
M(InvoluntaryContextSwitches, "") \
\
M(OSIOWaitMicroseconds) \
M(OSCPUWaitMicroseconds) \
M(OSCPUVirtualTimeMicroseconds) \
M(OSReadBytes) \
M(OSWriteBytes) \
M(OSReadChars) \
M(OSWriteChars) \
M(OSIOWaitMicroseconds, "Total time a thread spent waiting for a result of IO operation, from the OS point of view. This is real IO that doesn't include page cache.") \
M(OSCPUWaitMicroseconds, "Total time a thread was ready for execution but waiting to be scheduled by OS, from the OS point of view.") \
M(OSCPUVirtualTimeMicroseconds, "CPU time spent seen by OS. Does not include involuntary waits due to virtualization.") \
M(OSReadBytes, "Number of bytes read from disks or block devices. Doesn't include bytes read from page cache. May include excessive data due to block size, readahead, etc.") \
M(OSWriteBytes, "Number of bytes written to disks or block devices. Doesn't include bytes that are in page cache dirty pages. May not include data that was written by OS asynchronously.") \
M(OSReadChars, "Number of bytes read from filesystem, including page cache.") \
M(OSWriteChars, "Number of bytes written to filesystem, including page cache.") \
namespace ProfileEvents
{
#define M(NAME) extern const Event NAME = __COUNTER__;
#define M(NAME, DOCUMENTATION) extern const Event NAME = __COUNTER__;
APPLY_FOR_EVENTS(M)
#undef M
constexpr Event END = __COUNTER__;
@ -220,16 +217,28 @@ Counters Counters::getPartiallyAtomicSnapshot() const
return res;
}
const char * getDescription(Event event)
const char * getName(Event event)
{
static const char * descriptions[] =
static const char * strings[] =
{
#define M(NAME) #NAME,
#define M(NAME, DOCUMENTATION) #NAME,
APPLY_FOR_EVENTS(M)
#undef M
};
return descriptions[event];
return strings[event];
}
const char * getDocumentation(Event event)
{
static const char * strings[] =
{
#define M(NAME, DOCUMENTATION) DOCUMENTATION,
APPLY_FOR_EVENTS(M)
#undef M
};
return strings[event];
}
@ -241,11 +250,6 @@ void increment(Event event, Count amount)
DB::CurrentThread::getProfileEvents().increment(event, amount);
}
void reset(Event event)
{
DB::CurrentThread::getProfileEvents().reset(event);
}
}
#undef APPLY_FOR_EVENTS

View File

@ -59,16 +59,6 @@ namespace ProfileEvents
} while (current != nullptr);
}
inline void reset(Event event)
{
Counters * current = this;
do
{
current->counters[event].store(0, std::memory_order_relaxed);
current = current->parent;
} while (current != nullptr);
}
/// Every single value is fetched atomically, but not all values as a whole.
Counters getPartiallyAtomicSnapshot() const;
@ -96,11 +86,11 @@ namespace ProfileEvents
/// Increment a counter for event. Thread-safe.
void increment(Event event, Count amount = 1);
/// Set event value to zero. Thread-safe.
void reset(Event event);
/// Get name of event by identifier. Returns statically allocated string.
const char * getName(Event event);
/// Get text description of event by identifier. Returns statically allocated string.
const char * getDescription(Event event);
/// Get description of event by identifier. Returns statically allocated string.
const char * getDocumentation(Event event);
/// Get index just after last event identifier.
Event end();

View File

@ -14,6 +14,7 @@
*/
#include <common/Types.h>
#include <common/unaligned.h>
#include <string>
#include <type_traits>
@ -107,7 +108,7 @@ public:
while (data + 8 <= end)
{
current_word = *reinterpret_cast<const UInt64 *>(data);
current_word = unalignedLoad<UInt64>(data);
v3 ^= current_word;
SIPROUND;
@ -141,7 +142,7 @@ public:
void update(const std::string & x)
{
update(x.c_str(), x.length());
update(x.data(), x.length());
}
/// Get the result in some form. This can only be done once!

View File

@ -1,8 +1,10 @@
#include <Interpreters/AsynchronousMetrics.h>
#include <Interpreters/ExpressionJIT.h>
#include <Common/Exception.h>
#include <Common/setThreadName.h>
#include <Common/CurrentMetrics.h>
#include <Common/typeid_cast.h>
#include <Common/config.h>
#include <Storages/MarkCache.h>
#include <Storages/StorageMergeTree.h>
#include <Storages/StorageReplicatedMergeTree.h>
@ -132,6 +134,16 @@ void AsynchronousMetrics::update()
}
}
#if USE_EMBEDDED_COMPILER
{
if (auto compiled_expression_cache = context.getCompiledExpressionsCache())
{
set("CompiledExpressionsCacheBytes", compiled_expression_cache->weight());
set("CompiledExpressionsCacheCount", compiled_expression_cache->count());
}
}
#endif
set("Uptime", context.getUptimeSeconds());
{

View File

@ -1829,10 +1829,7 @@ void Context::dropCompiledExpressionsCache() const
{
auto lock = getLock();
if (shared->compiled_expression_cache)
{
shared->compiled_expression_cache->reset();
ProfileEvents::reset(ProfileEvents::CompiledCacheSizeBytes);
}
}
#endif

View File

@ -81,10 +81,7 @@ using ActionLocksManagerPtr = std::shared_ptr<ActionLocksManager>;
#if USE_EMBEDDED_COMPILER
struct ExpressionAction;
struct ActionsHash;
class LLVMFunction;
using CompiledExpressionCache = LRUCache<std::vector<ExpressionAction>, LLVMFunction, ActionsHash>;
class CompiledExpressionCache;
#endif

View File

@ -188,8 +188,6 @@ void ExpressionAction::prepare(Block & sample_block)
all_const = false;
}
ColumnPtr new_column;
/// If all arguments are constants, and function is suitable to be executed in 'prepare' stage - execute function.
if (all_const && function->isSuitableForConstantFolding())
{
@ -639,7 +637,7 @@ void ExpressionActions::prependProjectInput()
actions.insert(actions.begin(), ExpressionAction::project(getRequiredColumns()));
}
void ExpressionActions::prependArrayJoin(const ExpressionAction & action, const Block & sample_block)
void ExpressionActions::prependArrayJoin(const ExpressionAction & action, const Block & sample_block_before)
{
if (action.type != ExpressionAction::ARRAY_JOIN)
throw Exception("ARRAY_JOIN action expected", ErrorCodes::LOGICAL_ERROR);
@ -655,7 +653,7 @@ void ExpressionActions::prependArrayJoin(const ExpressionAction & action, const
}
for (const std::string & name : array_join_set)
{
input_columns.emplace_back(name, sample_block.getByName(name).type);
input_columns.emplace_back(name, sample_block_before.getByName(name).type);
actions.insert(actions.begin(), ExpressionAction::removeColumn(name));
}
@ -1068,9 +1066,9 @@ BlockInputStreamPtr ExpressionActions::createStreamWithNonJoinedDataIfFullOrRigh
}
/// It is not important to calculate the hash of individual strings or their concatenation
size_t ExpressionAction::ActionHash::operator()(const ExpressionAction & action) const
{
SipHash hash;
hash.update(action.type);
hash.update(action.is_function_compiled);

View File

@ -8,8 +8,10 @@
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnVector.h>
#include <Common/LRUCache.h>
#include <Common/MemoryTracker.h>
#include <Common/typeid_cast.h>
#include <Common/ProfileEvents.h>
#include <Common/Stopwatch.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/Native.h>
@ -49,7 +51,7 @@
namespace ProfileEvents
{
extern const Event CompileFunction;
extern const Event CompiledCacheSizeBytes;
extern const Event CompileExpressionsMicroseconds;
}
namespace DB
@ -163,17 +165,16 @@ auto wrapJITSymbolResolver(llvm::JITSymbolResolver & jsr)
struct CountingMMapper final : public llvm::SectionMemoryManager::MemoryMapper
{
size_t allocated_memory = 0;
MemoryTracker memory_tracker{VariableContext::Global};
llvm::sys::MemoryBlock allocateMappedMemory(llvm::SectionMemoryManager::AllocationPurpose /*purpose*/,
size_t num_bytes,
const llvm::sys::MemoryBlock * const near_block,
unsigned flags,
std::error_code & EC) override
std::error_code & error_code) override
{
allocated_memory += num_bytes;
return llvm::sys::Memory::allocateMappedMemory(num_bytes, near_block, flags, EC);
memory_tracker.alloc(num_bytes);
return llvm::sys::Memory::allocateMappedMemory(num_bytes, near_block, flags, error_code);
}
std::error_code protectMappedMemory(const llvm::sys::MemoryBlock & block, unsigned flags) override
@ -183,13 +184,14 @@ struct CountingMMapper final : public llvm::SectionMemoryManager::MemoryMapper
std::error_code releaseMappedMemory(llvm::sys::MemoryBlock & block) override
{
allocated_memory -= block.size();
memory_tracker.free(block.size());
return llvm::sys::Memory::releaseMappedMemory(block);
}
};
struct LLVMContext
{
static inline std::atomic<size_t> id_counter{0};
llvm::LLVMContext context;
#if LLVM_VERSION_MAJOR >= 7
llvm::orc::ExecutionSession execution_session;
@ -205,6 +207,7 @@ struct LLVMContext
llvm::DataLayout layout;
llvm::IRBuilder<> builder;
std::unordered_map<std::string, void *> symbols;
size_t id;
LLVMContext()
#if LLVM_VERSION_MAJOR >= 7
@ -226,15 +229,16 @@ struct LLVMContext
, compile_layer(object_layer, llvm::orc::SimpleCompiler(*machine))
, layout(machine->createDataLayout())
, builder(context)
, id(id_counter++)
{
module->setDataLayout(layout);
module->setTargetTriple(machine->getTargetTriple().getTriple());
}
size_t finalize()
void finalize()
{
if (!module->size())
return 0;
return;
llvm::PassManagerBuilder builder;
llvm::legacy::PassManager mpm;
llvm::legacy::FunctionPassManager fpm(module.get());
@ -283,8 +287,6 @@ struct LLVMContext
throw Exception("Function " + name + " failed to link", ErrorCodes::CANNOT_COMPILE_CODE);
symbols[name] = reinterpret_cast<void *>(*address);
}
return memory_mapper->allocated_memory;
}
};
@ -322,7 +324,7 @@ public:
reinterpret_cast<void (*) (size_t, ColumnData *)>(function)(block_size, columns.data());
}
block.getByPosition(result).column = std::move(col_res);
};
}
};
static void compileFunction(std::shared_ptr<LLVMContext> & context, const IFunctionBase & f)
@ -565,6 +567,23 @@ static bool isCompilable(llvm::IRBuilderBase & builder, const IFunctionBase & fu
return function.isCompilable();
}
size_t CompiledExpressionCache::weight() const
{
std::lock_guard<std::mutex> lock(mutex);
size_t result{0};
std::unordered_set<size_t> seen;
for (const auto & cell : cells)
{
auto function_context = cell.second.value->getContext();
if (!seen.count(function_context->id))
{
result += function_context->memory_mapper->memory_tracker.get();
seen.insert(function_context->id);
}
}
return result;
}
void compileFunctions(ExpressionActions::Actions & actions, const Names & output_columns, const Block & sample_block, std::shared_ptr<CompiledExpressionCache> compilation_cache)
{
struct LLVMTargetInitializer
@ -647,13 +666,20 @@ void compileFunctions(ExpressionActions::Actions & actions, const Names & output
std::shared_ptr<LLVMFunction> fn;
if (compilation_cache)
{
bool success;
auto set_func = [&fused, i, context, &sample_block] () { return std::make_shared<LLVMFunction>(fused[i], context, sample_block); };
std::tie(fn, std::ignore) = compilation_cache->getOrSet(fused[i], set_func);
Stopwatch watch;
std::tie(fn, success) = compilation_cache->getOrSet(fused[i], set_func);
if (success)
ProfileEvents::increment(ProfileEvents::CompileExpressionsMicroseconds, watch.elapsedMicroseconds());
}
else
{
Stopwatch watch;
fn = std::make_shared<LLVMFunction>(fused[i], context, sample_block);
ProfileEvents::increment(ProfileEvents::CompileExpressionsMicroseconds, watch.elapsedMicroseconds());
}
actions[i].function = fn;
actions[i].argument_names = fn->getArgumentNames();
actions[i].is_function_compiled = true;
@ -666,9 +692,7 @@ void compileFunctions(ExpressionActions::Actions & actions, const Names & output
fused[*dep].insert(fused[*dep].end(), fused[i].begin(), fused[i].end());
}
size_t used_memory = context->finalize();
ProfileEvents::increment(ProfileEvents::CompiledCacheSizeBytes, used_memory);
context->finalize();
}
}

View File

@ -7,10 +7,13 @@
#include <Functions/IFunction.h>
#include <Interpreters/Context.h>
#include <Interpreters/ExpressionActions.h>
#include <Common/LRUCache.h>
#include <set>
namespace DB
{
struct LLVMContext;
using CompilableExpression = std::function<llvm::Value * (llvm::IRBuilderBase &, const ValuePlaceholders &)>;
@ -22,7 +25,6 @@ class LLVMFunction : public IFunctionBase
std::shared_ptr<LLVMContext> context;
std::vector<FunctionBasePtr> originals;
std::unordered_map<StringRef, CompilableExpression> subexpressions;
public:
LLVMFunction(const ExpressionActions::Actions & actions, std::shared_ptr<LLVMContext> context, const Block & sample_block);
@ -51,8 +53,23 @@ public:
bool hasInformationAboutMonotonicity() const override;
Monotonicity getMonotonicityForRange(const IDataType & type, const Field & left, const Field & right) const override;
std::shared_ptr<LLVMContext> getContext() const { return context; }
};
/** This child of LRUCache breaks one of it's invariants: total weight may be changed after insertion.
* We have to do so, because we don't known real memory consumption of generated LLVM code for every function.
*/
class CompiledExpressionCache : public LRUCache<std::vector<ExpressionAction>, LLVMFunction, ActionsHash>
{
private:
using Base = LRUCache<std::vector<ExpressionAction>, LLVMFunction, ActionsHash>;
public:
using Base::Base;
size_t weight() const;
};
/// For each APPLY_FUNCTION action, try to compile the function to native code; if the only uses of a compilable
/// function's result are as arguments to other compilable functions, inline it and leave the now-redundant action as-is.