2014-10-26 00:01:36 +00:00
|
|
|
#pragma once
|
|
|
|
|
2020-04-16 07:48:49 +00:00
|
|
|
#include <Core/BackgroundSchedulePool.h>
|
2021-04-10 23:33:54 +00:00
|
|
|
#include <Core/NamesAndTypes.h>
|
|
|
|
#include <Storages/IStorage.h>
|
|
|
|
|
2017-02-07 15:38:57 +00:00
|
|
|
#include <Poco/Event.h>
|
2014-10-26 00:01:36 +00:00
|
|
|
|
2021-04-10 23:33:54 +00:00
|
|
|
#include <atomic>
|
|
|
|
#include <mutex>
|
|
|
|
#include <thread>
|
|
|
|
|
2014-10-26 00:01:36 +00:00
|
|
|
|
2017-01-21 04:24:28 +00:00
|
|
|
namespace Poco { class Logger; }
|
|
|
|
|
|
|
|
|
2014-10-26 00:01:36 +00:00
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
|
|
|
|
2017-04-16 15:00:33 +00:00
|
|
|
/** During insertion, buffers the data in the RAM until certain thresholds are exceeded.
|
|
|
|
* When thresholds are exceeded, flushes the data to another table.
|
|
|
|
* When reading, it reads both from its buffers and from the subordinate table.
|
2014-10-26 00:01:36 +00:00
|
|
|
*
|
2017-04-16 15:00:33 +00:00
|
|
|
* The buffer is a set of num_shards blocks.
|
|
|
|
* When writing, select the block number by the remainder of the `ThreadNumber` division by `num_shards` (or one of the others),
|
|
|
|
* and add rows to the corresponding block.
|
2017-06-06 17:06:14 +00:00
|
|
|
* When using a block, it is locked by some mutex. If during write the corresponding block is already occupied
|
|
|
|
* - try to lock the next block in a round-robin fashion, and so no more than `num_shards` times (then wait for lock).
|
2017-04-16 15:00:33 +00:00
|
|
|
* Thresholds are checked on insertion, and, periodically, in the background thread (to implement time thresholds).
|
|
|
|
* Thresholds act independently for each shard. Each shard can be flushed independently of the others.
|
|
|
|
* If a block is inserted into the table, which itself exceeds the max-thresholds, it is written directly to the subordinate table without buffering.
|
|
|
|
* Thresholds can be exceeded. For example, if max_rows = 1 000 000, the buffer already had 500 000 rows,
|
2017-06-06 17:06:14 +00:00
|
|
|
* and a part of 800 000 rows is added, then there will be 1 300 000 rows in the buffer, and then such a block will be written to the subordinate table.
|
2014-10-26 00:01:36 +00:00
|
|
|
*
|
2021-04-12 06:04:38 +00:00
|
|
|
* There are also separate thresholds for flush, those thresholds are checked only for non-direct flush.
|
|
|
|
* This maybe useful if you do not want to add extra latency for INSERT queries,
|
|
|
|
* so you can set max_rows=1e6 and flush_rows=500e3, then each 500e3 rows buffer will be flushed in background only.
|
|
|
|
*
|
2017-06-06 17:06:14 +00:00
|
|
|
* When you destroy a Buffer table, all remaining data is flushed to the subordinate table.
|
2017-04-16 15:00:33 +00:00
|
|
|
* The data in the buffer is not replicated, not logged to disk, not indexed. With a rough restart of the server, the data is lost.
|
2014-10-26 00:01:36 +00:00
|
|
|
*/
|
2022-05-03 06:43:28 +00:00
|
|
|
class StorageBuffer final : public IStorage, WithContext
|
2014-10-26 00:01:36 +00:00
|
|
|
{
|
2020-01-29 18:14:40 +00:00
|
|
|
friend class BufferSource;
|
2021-07-23 14:25:35 +00:00
|
|
|
friend class BufferSink;
|
2014-10-26 00:01:36 +00:00
|
|
|
|
|
|
|
public:
|
2017-04-01 07:20:54 +00:00
|
|
|
struct Thresholds
|
|
|
|
{
|
2021-04-12 06:04:38 +00:00
|
|
|
time_t time = 0; /// The number of seconds from the insertion of the first row into the block.
|
|
|
|
size_t rows = 0; /// The number of rows in the block.
|
|
|
|
size_t bytes = 0; /// The number of (uncompressed) bytes in the block.
|
2017-04-01 07:20:54 +00:00
|
|
|
};
|
|
|
|
|
2022-04-19 20:47:29 +00:00
|
|
|
/** num_shards - the level of internal parallelism (the number of independent buffers)
|
|
|
|
* The buffer is flushed if all minimum thresholds or at least one of the maximum thresholds are exceeded.
|
|
|
|
*/
|
|
|
|
StorageBuffer(
|
|
|
|
const StorageID & table_id_,
|
|
|
|
const ColumnsDescription & columns_,
|
|
|
|
const ConstraintsDescription & constraints_,
|
|
|
|
const String & comment,
|
|
|
|
ContextPtr context_,
|
|
|
|
size_t num_shards_,
|
|
|
|
const Thresholds & min_thresholds_,
|
|
|
|
const Thresholds & max_thresholds_,
|
|
|
|
const Thresholds & flush_thresholds_,
|
|
|
|
const StorageID & destination_id,
|
|
|
|
bool allow_materialized_);
|
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
std::string getName() const override { return "Buffer"; }
|
|
|
|
|
2021-04-22 13:32:17 +00:00
|
|
|
QueryProcessingStage::Enum
|
2021-07-09 03:15:41 +00:00
|
|
|
getQueryProcessingStage(ContextPtr, QueryProcessingStage::Enum, const StorageSnapshotPtr &, SelectQueryInfo &) const override;
|
2018-04-19 14:47:09 +00:00
|
|
|
|
2020-09-29 16:21:58 +00:00
|
|
|
void read(
|
|
|
|
QueryPlan & query_plan,
|
|
|
|
const Names & column_names,
|
2021-07-09 03:15:41 +00:00
|
|
|
const StorageSnapshotPtr & storage_snapshot,
|
2020-11-10 12:02:22 +00:00
|
|
|
SelectQueryInfo & query_info,
|
2021-04-10 23:33:54 +00:00
|
|
|
ContextPtr context,
|
2018-04-19 14:47:09 +00:00
|
|
|
QueryProcessingStage::Enum processed_stage,
|
2019-02-18 23:38:44 +00:00
|
|
|
size_t max_block_size,
|
2022-10-07 10:46:45 +00:00
|
|
|
size_t num_streams) override;
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2020-08-26 16:41:30 +00:00
|
|
|
bool supportsParallelInsert() const override { return true; }
|
|
|
|
|
2020-12-22 16:40:53 +00:00
|
|
|
bool supportsSubcolumns() const override { return true; }
|
|
|
|
|
2023-06-07 18:33:08 +00:00
|
|
|
SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context, bool /*async_insert*/) override;
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2017-06-06 17:06:14 +00:00
|
|
|
void startup() override;
|
|
|
|
/// Flush all buffers into the subordinate table and stop background thread.
|
2023-07-05 16:11:25 +00:00
|
|
|
void flushAndPrepareForShutdown() override;
|
2020-11-06 14:07:56 +00:00
|
|
|
bool optimize(
|
|
|
|
const ASTPtr & query,
|
|
|
|
const StorageMetadataPtr & metadata_snapshot,
|
|
|
|
const ASTPtr & partition,
|
|
|
|
bool final,
|
|
|
|
bool deduplicate,
|
2020-12-01 09:10:12 +00:00
|
|
|
const Names & deduplicate_by_columns,
|
2023-12-28 13:07:59 +00:00
|
|
|
bool cleanup,
|
2021-04-10 23:33:54 +00:00
|
|
|
ContextPtr context) override;
|
2017-04-01 07:20:54 +00:00
|
|
|
|
|
|
|
bool supportsSampling() const override { return true; }
|
2021-01-27 18:05:18 +00:00
|
|
|
bool supportsPrewhere() const override;
|
2017-04-01 07:20:54 +00:00
|
|
|
bool supportsFinal() const override { return true; }
|
2018-03-16 09:00:04 +00:00
|
|
|
|
2021-04-10 23:33:54 +00:00
|
|
|
void checkAlterIsPossible(const AlterCommands & commands, ContextPtr context) const override;
|
2019-12-26 18:17:05 +00:00
|
|
|
|
2020-03-29 07:50:47 +00:00
|
|
|
/// The structure of the subordinate table is not checked and does not change.
|
2021-10-25 17:49:49 +00:00
|
|
|
void alter(const AlterCommands & params, ContextPtr context, AlterLockHolder & table_lock_holder) override;
|
2020-03-29 07:50:47 +00:00
|
|
|
|
2020-11-25 13:47:32 +00:00
|
|
|
std::optional<UInt64> totalRows(const Settings & settings) const override;
|
|
|
|
std::optional<UInt64> totalBytes(const Settings & settings) const override;
|
2014-10-26 00:01:36 +00:00
|
|
|
|
2021-05-12 18:12:36 +00:00
|
|
|
std::optional<UInt64> lifetimeRows() const override { return lifetime_writes.rows; }
|
|
|
|
std::optional<UInt64> lifetimeBytes() const override { return lifetime_writes.bytes; }
|
2020-07-11 12:26:53 +00:00
|
|
|
|
|
|
|
|
2014-10-26 00:01:36 +00:00
|
|
|
private:
|
2017-04-01 07:20:54 +00:00
|
|
|
struct Buffer
|
|
|
|
{
|
|
|
|
time_t first_write_time = 0;
|
|
|
|
Block data;
|
2021-04-06 18:12:40 +00:00
|
|
|
|
2023-10-31 20:41:54 +00:00
|
|
|
/// Schema version, checked to avoid mixing blocks with different sets of columns, from
|
|
|
|
/// before and after an ALTER. There are some remaining mild problems if an ALTER happens
|
|
|
|
/// in the middle of a long-running INSERT:
|
|
|
|
/// * The data produced by the INSERT after the ALTER is not visible to SELECTs until flushed.
|
|
|
|
/// That's because BufferSource skips buffers with old metadata_version instead of converting
|
|
|
|
/// them to the latest schema, for simplicity.
|
|
|
|
/// * If there are concurrent INSERTs, some of which started before the ALTER and some started
|
|
|
|
/// after, then the buffer's metadata_version will oscillate back and forth between the two
|
|
|
|
/// schemas, flushing the buffer each time. This is probably fine because long-running INSERTs
|
|
|
|
/// usually don't produce lots of small blocks.
|
|
|
|
int32_t metadata_version = 0;
|
|
|
|
|
2021-04-06 18:12:40 +00:00
|
|
|
std::unique_lock<std::mutex> lockForReading() const;
|
|
|
|
std::unique_lock<std::mutex> lockForWriting() const;
|
|
|
|
std::unique_lock<std::mutex> tryLock() const;
|
|
|
|
|
|
|
|
private:
|
2020-03-29 07:50:47 +00:00
|
|
|
mutable std::mutex mutex;
|
2021-04-06 18:12:40 +00:00
|
|
|
|
|
|
|
std::unique_lock<std::mutex> lockImpl(bool read) const;
|
2017-04-01 07:20:54 +00:00
|
|
|
};
|
2014-10-26 00:01:36 +00:00
|
|
|
|
2017-04-16 15:00:33 +00:00
|
|
|
/// There are `num_shards` of independent buffers.
|
2017-04-01 07:20:54 +00:00
|
|
|
const size_t num_shards;
|
|
|
|
std::vector<Buffer> buffers;
|
2014-10-26 00:01:36 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
const Thresholds min_thresholds;
|
|
|
|
const Thresholds max_thresholds;
|
2021-04-12 06:04:38 +00:00
|
|
|
const Thresholds flush_thresholds;
|
2014-10-26 00:01:36 +00:00
|
|
|
|
2020-02-17 19:28:25 +00:00
|
|
|
StorageID destination_id;
|
2018-01-12 13:03:19 +00:00
|
|
|
bool allow_materialized;
|
2014-10-26 00:01:36 +00:00
|
|
|
|
2021-05-12 18:12:36 +00:00
|
|
|
struct Writes
|
2020-07-11 21:58:32 +00:00
|
|
|
{
|
|
|
|
std::atomic<size_t> rows = 0;
|
|
|
|
std::atomic<size_t> bytes = 0;
|
2021-05-12 18:12:36 +00:00
|
|
|
};
|
|
|
|
Writes lifetime_writes;
|
|
|
|
Writes total_writes;
|
2020-07-11 12:26:53 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
Poco::Logger * log;
|
2014-10-26 00:01:36 +00:00
|
|
|
|
2021-12-30 12:24:50 +00:00
|
|
|
void flushAllBuffers(bool check_thresholds = true);
|
|
|
|
bool flushBuffer(Buffer & buffer, bool check_thresholds, bool locked = false);
|
2021-04-12 06:04:38 +00:00
|
|
|
bool checkThresholds(const Buffer & buffer, bool direct, time_t current_time, size_t additional_rows = 0, size_t additional_bytes = 0) const;
|
|
|
|
bool checkThresholdsImpl(bool direct, size_t rows, size_t bytes, time_t time_passed) const;
|
2014-10-26 00:01:36 +00:00
|
|
|
|
2017-04-16 15:00:33 +00:00
|
|
|
/// `table` argument is passed, as it is sometimes evaluated beforehand. It must match the `destination`.
|
2017-04-01 07:20:54 +00:00
|
|
|
void writeBlockToDestination(const Block & block, StoragePtr table);
|
2014-10-27 04:18:13 +00:00
|
|
|
|
2020-12-25 01:20:09 +00:00
|
|
|
void backgroundFlush();
|
2020-04-16 07:48:49 +00:00
|
|
|
void reschedule();
|
|
|
|
|
2022-08-26 01:00:56 +00:00
|
|
|
StoragePtr getDestinationTable() const;
|
|
|
|
|
2020-04-16 07:48:49 +00:00
|
|
|
BackgroundSchedulePool & bg_pool;
|
|
|
|
BackgroundSchedulePoolTaskHolder flush_handle;
|
2014-10-26 00:01:36 +00:00
|
|
|
};
|
|
|
|
|
|
|
|
}
|