ClickHouse/docs/ja/development/architecture.md
2024-11-18 11:58:58 +09:00

48 KiB
Raw Blame History

slug sidebar_label sidebar_position
/ja/development/architecture アーキテクチャ概要 62

ClickHouseアーキテクチャの概要

ClickHouseは真の列指向DBMSです。データはカラム単位で保存され、配列ベクトルまたはカラムのチャンクの実行中に処理されます。 可能な限り、個々の値ではなく配列に対する操作が行われます。これを「ベクトル化クエリ実行」と呼び、実際のデータ処理のコストを下げるのに役立ちます。

このアイデアは新しいものではありません。APL1957年のプログラミング言語に遡り、その後継としてA +APL方言J1990K1993、およびQKx Systemsのプログラミング言語、2003があります。配列プログラミングは科学データ処理で使用されています。関係データベースでも新しいアイデアではありません例えば、VectorWiseシステムActian CorporationによるActian Vector Analytical Databaseとしても知られていますで使用されています。

クエリ処理を加速するためのアプローチには、ベクトル化クエリ実行と実行時コード生成の2つがあります。後者はすべての間接参照と動的ディスパッチを取り除きます。これらの方法のいずれも、もう一方よりも必ずしも優れているわけではありません。実行時コード生成は多くの操作を融合し、CPUの実行ユニットとパイプラインを完全に活用する場合により効果的です。ベクトル化クエリ実行は一時的なベクトルをキャッシュに書き込み、それを読み戻す必要があるため、実用的でない場合があります。一時データがL2キャッシュに収まらない場合、これが問題になります。しかし、ベクトル化クエリ実行は、CPUのSIMD機能をより簡単に利用できます。友人による研究論文によれば、両方のアプローチを組み合わせる方が良いと示されています。ClickHouseはベクトル化クエリ実行を使用し、実行時コード生成のための限定的な初期サポートがあります。

カラム

IColumnインターフェースは、メモリ内のカラム(実際にはカラムのチャンク)を表すために使用されます。このインターフェースは、さまざまな関係演算子の実装を支援するメソッドを提供します。ほぼすべての操作はイミュータブルです:元のカラムを変更せず、新しいものを作成します。例えば、IColumn :: filterメソッドはフィルタバイトマスクを受け取ります。これはWHEREHAVINGの関係演算子で使用されます。追加の例:ORDER BYをサポートするためのIColumn :: permuteメソッド、LIMITをサポートするためのIColumn :: cutメソッド。

さまざまなIColumn実装(ColumnUInt8ColumnStringなど)は、カラムのメモリレイアウトを担当します。メモリレイアウトは通常、連続した配列です。カラムの整数型の場合、std :: vectorのように、ただ1つの連続した配列です。StringArrayカラムの場合、すべての配列要素を連続して配置する1つのベクトルと、各配列の先頭へのオフセットを持つ2番目のベクトルが存在します。また、ColumnConstはメモリに1つの値だけを保存し、カラムのように見えます。

フィールド

それでも、個々の値を操作することも可能です。個々の値を表すために、Fieldが使用されます。FieldUInt64Int64Float64String、およびArrayの区別された共用体に過ぎません。IColumnには、n番目の値をFieldとして取得するためのoperator []メソッドと、Fieldをカラムの末尾に追加するためのinsertメソッドがあります。これらのメソッドは、個々の値を表すテンポラリのFieldオブジェクトを扱う必要があるため、非常に効率的ではありません。もっと効率的なメソッドとしてinsertFrominsertRangeFromなどがあります。

Fieldには、特定のテーブルのデータ型に関する十分な情報はありません。例えば、UInt8UInt16UInt32、およびUInt64はすべてField内ではUInt64で表されます。

リーキー抽象

IColumnはデータの一般的な関係変換のメソッドを持っていますが、すべてのニーズを満たすわけではありません。例えば、ColumnUInt64には2つのカラムを合計するメソッドがなく、ColumnStringには部分文字列検索を実行するためのメソッドがありません。これらの無数のルーチンは、IColumnの外部で実装されています。

カラム上のさまざまな関数は、Field値を抽出するためにIColumnメソッドを使用して一般的に非効率的な方法で実装されるか、特定のIColumn実装内のデータの内部メモリレイアウトを知っていて特殊な方法で実装されます。これは特定のIColumn型にキャスト関数を実装し、内部表現を直接扱うことによって実現されます。例えば、ColumnUInt64には内部配列へのリファレンスを返すgetDataメソッドがあり、別のルーチンがその配列を直接読み取ったり埋めたりします。効率的な特殊化を可能にするための「リーキー抽象」があります。

データ型

IDataTypeはシリアル化および逆シリアル化に責任を負います。これにより、カラムのチャンクまたは個々の値をバイナリまたはテキスト形式で読み書きできます。IDataTypeはテーブル内のデータ型に直接対応しています。例えば、DataTypeUInt32DataTypeDateTimeDataTypeStringなどがあります。

IDataTypeIColumnは互いにゆるく関連しています。異なるデータ型が、同じIColumn実装によってメモリ上で表されることがあります。例えば、DataTypeUInt32DataTypeDateTimeは、どちらもColumnUInt32またはColumnConstUInt32で表されます。さらに、同じデータ型が異なるIColumn実装によって表されることもあります。例えば、DataTypeUInt8ColumnUInt8ColumnConstUInt8によって表されることがあります。

IDataTypeはメタデータのみを保持します。例としては、DataTypeUInt8は何も保持しておらず(仮想ポインタvptrを除く)、DataTypeFixedStringN(固定サイズ文字列のサイズ)だけを保持します。

IDataTypeはさまざまなデータフォーマットのためのヘルパーメソッドを持っています。例として、クォートの可能性を考慮して値をシリアル化する方法、JSONのために値をシリアル化する方法、およびXML形式の一部として値をシリアル化する方法があります。データフォーマットとの直接の対応はありません。例えば、異なるデータフォーマットPrettyTabSeparatedは、IDataTypeインターフェースのserializeTextEscapedヘルパーメソッドを共有して使用します。

ブロック

Blockはメモリ内でテーブルのサブセットチャンクを表すコンテナです。それは単に3つ組IColumn, IDataType, カラム名)のセットです。クエリ実行中、データはBlockとして処理されます。Blockを持っている場合、データ(IColumnオブジェクト内に)も持ち、タイプに関する情報(IDataType内)があり、カラムがどのように処理されるかを教えます。そしてカラム名もあります。それは、もとのテーブルからのカラム名か、計算結果を一時的に取得するために割り当てられた人工的な名前です。

ブロック内でカラムに関数を計算する場合、結果を含む別のカラムをブロックに追加し、関数の引数用のカラムには触れません。操作はイミュータブルです。後で不要なカラムはブロックから削除できますが、変更はされません。これは共通部分式の消去に便利です。

ブロックは処理されたデータチャンクごとに作成されます。注意すべきは、同じタイプの計算では異なるブロックでもカラム名とタイプが同じであるため、カラムデータだけが変化します。ブロックデータはブロックヘッダーから分離する方が良いです。小さなブロックサイズは、shared_ptrのコピーやカラム名の一時文字列のオーバーヘッドが高いです。

プロセッサ

記述はhttps://github.com/ClickHouse/ClickHouse/blob/master/src/Processors/IProcessor.hを参照してください。

フォーマット

データフォーマットはプロセッサで実装されています。

I/O

バイト指向の入出力には、ReadBufferWriteBufferの抽象クラスがあります。これらはC++のiostreamの代わりに使用されます。心配しないでください成熟したC++プロジェクトは、良い理由で常にiostream以外の何かを使用しています。

ReadBufferWriteBufferは、単なる連続したバッファとそのバッファ内の位置を指すカーソルです。実装はバッファのメモリを保持する場合としない場合があります。バッファを次のデータで満たすため(ReadBufferの場合)またはどこかにバッファをフラッシュするため(WriteBufferの場合)の仮想メソッドがあります。仮想メソッドはあまり呼ばれません。

ReadBuffer/WriteBufferの実装はファイルとファイル記述子、ネットワークソケットで動作するためや、圧縮を実装するために使用されます(CompressedWriteBufferは別のWriteBufferで初期化され、そこにデータを書き込む前に圧縮を行います他の目的のために使用されます名前ConcatReadBufferLimitReadBuffer、およびHashingWriteBufferはそれ自体を物語っています。

Read/WriteBuffersはバイトを扱うだけです。入力/出力のフォーマットを支援するために、ReadHelpersおよびWriteHelpersヘッダーファイルからの関数があります。例えば、小数形式で数値を書くためのヘルパーがあります。

結果セットをJSON形式で標準出力に書き込むときに何が起こるかを見てみましょう。 結果セットは、プルクエリパイプラインからフェッチされる準備が整っています。 まず、標準出力にバイトを書き込むためにWriteBufferFromFileDescriptor(STDOUT_FILENO)を作成します。 次に、クエリパイプラインの結果をJSON形式で標準出力に行を出力するJSONRowOutputFormatに接続します。 これはcompleteメソッドを介して行うことができ、プルクエリパイプラインを完了したクエリパイプラインに変換します。 内部的には、JSONRowOutputFormatはさまざまなJSONデリミタを書き込み、IDataType::serializeTextJSONメソッドをIColumnと行番号の参照として呼び出します。したがって、IDataType::serializeTextJSONWriteHelpers.hからのメソッドを呼び出します:例えば数値タイプのwriteTextDataTypeStringwriteJSONStringです。

テーブル

IStorageインターフェースはテーブルを表します。そのインターフェースの異なる実装は異なるテーブルエンジンです。例としては、StorageMergeTreeStorageMemoryなどです。これらのクラスのインスタンスは、ただテーブルを表します。

IStorageの主要なメソッドは、readwriteです。他にもalterrenamedropがあります。readメソッドは次の引数を受け取ります:テーブルから読み取るカラムのセット、考慮するASTクエリ、および望ましいストリームの数です。そしてPipeを返します。

ほとんどの場合、読み取りメソッドはテーブルから指定されたカラムを読むだけの責任があります。それ以外のデータ処理はパイプラインの他の部分で行われ、IStorageの責任範囲外です。

しかし、顕著な例外があります:

  • ASTクエリはreadメソッドに渡され、テーブルエンジンがインデックス使用を推論し、テーブルからより少ないデータを読み取るために使用できます。
  • 時には、テーブルエンジンが特定のステージまでデータを自分で処理することができます。例えば、StorageDistributedはリモートサーバにクエリを送り、データを処理して異なるリモートサーバからマージできるステージまで要求し、その前処理されたデータを返すことができます。その後、クエリインタプリタがデータ処理を完了します。

テーブルのreadメソッドは、通常は複数のProcessorで構成されるPipeを返します。これらのプロセッサは、並行してテーブルを読むことができます。 次に、これらのプロセッサをさまざまな他の変換(式の評価やフィルタリングなど)に接続できます。これらは独立して計算されます。 そして、それらの上にQueryPipelineを作成し、PipelineExecutorを介して実行します。

また、TableFunctionがあります。これらはFROM句で使用するために一時的なIStorageオブジェクトを返す関数です。

テーブルエンジンの実装方法を簡単に知るためには、StorageMemoryStorageTinyLogのような単純なものを見てください。

IStoragereadメソッドの結果として、QueryProcessingStage—ストレージ内で既に計算されたクエリの部分についての情報を返します。

パーサ

クエリは手書きの再帰型下降法のパーサによって解析されます。例えば、ParserSelectQueryはクエリのさまざまな部分のための基礎的なパーサを再帰的に呼び出します。パーサはASTを生成します。ASTはノードとして表現され、これはIASTのインスタンスです。

パーサジェネレータは歴史的な理由から使用されていません。

インタプリタ

インタプリタは、ASTからクエリ実行パイプラインを作成する責任があります。単純なインタプリタInterpreterExistsQueryInterpreterDropQuery)と、より高度なInterpreterSelectQueryがあります。

クエリ実行パイプラインは、特定のタイプの列セットのチャンクを消費し、生成できるプロセッサの組み合わせです。 プロセッサはポートを介して通信し、複数の入力ポートと複数の出力ポートを持つことがあります。 詳細な説明はsrc/Processors/IProcessor.hにあります。

例えば、SELECTクエリのインタプリタの結果は、結果セットから読み取る特別な出力ポートがある「プルクエリパイプライン」です。 INSERTクエリの結果は、挿入用のデータを書き込む入力ポートを持つ「プッシュクエリパイプライン」です。 INSERT SELECTクエリのインタプリタの結果は、入力または出力を持たず、同時にSELECTからINSERTにデータをコピーする「完了したクエリパイプライン」です。

InterpreterSelectQueryはクエリアナリシスと変換のためにExpressionAnalyzerExpressionActionsの仕組みを使用します。これは、ほとんどのルールベースのクエリ最適化が行われる場所です。ExpressionAnalyzerはかなり複雑で、クエリ変換と最適化を個別のクラスに抽出し、クエリのモジュール変換を可能にするために書き直す必要があります。

インタプリタに存在する問題を解決するために、新しいInterpreterSelectQueryAnalyzerが開発されました。これはExpressionAnalyzerを使用しないInterpreterSelectQueryの新しいバージョンであり、ASTQueryPipelineの間にQueryTreeという追加の抽象化層を導入します。これはプロダクションでの使用に完全に準備されていますが、念のためそのenable_analyzer設定をfalseに設定することでオフにすることができます。

関数

通常の関数と集約関数があります。集約関数については次のセクションを参照してください。

通常の関数は行数を変更しません。各行を独立して処理するように動作します。実際には、関数は個々の行のためではなく、ベクトル化クエリ実行を実現するためにBlockのデータのために呼び出されます。

いくつかの雑多な関数があります。blockSizerowNumberInBlock、およびrunningAccumulateなどがあり、ブロック処理を利用し、行の独立性に違反します。

ClickHouseは強力な型付けを持っているため、暗黙の型変換はありません。関数が特定の型の組み合わせをサポートしていない場合、例外をスローします。しかし、関数は多くの異なる型の組み合わせに対してオーバーロードされて動作することがあります。例えば、plus関数(+演算子を実装するための)は、任意の数値型の組み合わせで動作します:UInt8 + Float32UInt16 + Int8など。また、いくつかの可変的な関数は任意の数の引数を受け取ります。例えばconcat関数です。

関数を実装することはやや不便かもしれません。なぜなら、関数はサポートされるデータ型とサポートされるIColumnsを明示的にディスパッチするためです。例えば、plus関数は各数値型の組み合わせと定数または非定数の左および右の引数に対してC++テンプレートのインスタンス化によるコード生成を持っています。

ランタイムコード生成を実装してテンプレートコードの肥大化を避けるには優れた場所です。また、乗算加算を融合させた関数や一つのループ反復で複数の比較を行うことが可能になります。

ベクトル化クエリ実行のため、関数はショートサーキットされません。例えば、WHERE f(x) AND g(y)と書くと、両方の側が計算され、f(x)がゼロである行でも計算されます(ただし、f(x)がゼロ定数式でない限り)。しかし、f(x)条件の選択性が高く、f(x)の計算がg(y)よりもはるかに安い場合、マルチパス計算を実装した方が良いです。最初にf(x)を計算し、次にその結果でカラムをフィルタリングし、フィルタリングされた小さいデータチャンクのためにのみg(y)を計算します。

集約関数

集約関数は状態を保持する関数です。渡された値をいくつかの状態に蓄積し、その状態から結果を取得することができます。これらはIAggregateFunctionインターフェースで管理されます。状態は非常に単純(AggregateFunctionCountの状態は単一のUInt64値です)な場合もありますし、非常に複雑(AggregateFunctionUniqCombinedの状態は線形配列、ハッシュテーブル、そしてHyperLogLog確率データ構造の組み合わせです)な場合もあります。

状態はArena(メモリプール)に割り当てられます。これは高いカーディナリティのGROUP BYクエリを実行する際に複数の状態を処理するためです。状態には非自明なコンストラクタとデストラクタがある場合があります。例えば、複雑な集約状態は自分自身で追加のメモリを割り当てることができます。これは、状態を作成し破棄する際の注意が必要であり、所有権の適切な引き継ぎと破棄順序が必要です。

集約状態は、分散クエリ実行中にネットワークを介して渡したり、RAMが不足してディスクに書き込むためにシリアル化および逆シリアル化することができます。それらは、DataTypeAggregateFunctionを持つテーブルに保存してデータのインクリメンタル集約を可能にすることもできます。

集約関数状態のシリアル化されたデータ形式は、現在バージョン管理されていません。集約状態が一時的にしか保持されない場合、これは問題ではありません。しかし、私たちにはインクリメンタル集約のためのAggregatingMergeTreeテーブルエンジンがあり、すでにプロダクションで使用されています。それが、将来的にシリアル化形式を変更する際に、このような形式互換性が必要になる理由です。

サーバ

サーバは異なるインターフェースを実装しています:

  • 任意の外部クライアント用のHTTPインターフェース。
  • ネイティブなClickHouseクライアントや分散クエリ実行中のクロスサーバ通信用のTCPインターフェース。
  • レプリケーションのためのデータ転送用のインターフェース。

内部的には、ただのプリミティブなマルチスレッドサーバで、コルーチンやファイバーはありません。サーバは単純なクエリを高率に処理するためではなく、比較的少ない量の複雑なクエリを処理し、それぞれが分析のために膨大なデータを処理できるように設計されています。

サーバはクエリ実行に必要な環境を持つContextクラスを初期化します:利用可能なデータベースのリスト、ユーザーとアクセス権限、設定、クラスタ、プロセスリスト、クエリログなどです。インタプリタはこの環境を使用します。

サーバTCPプロトコルは、後方および前方互換性を完全に維持します古いクライアントは新しいサーバと通信でき、新しいクライアントは古いサーバと通信できます。しかし、永遠にそれを維持し続けたくはないので、私たちは約1年後に古いバージョンのサポートを削除しています。

:::note ほとんどの外部アプリケーションに対しては、HTTPインターフェースの使用をお勧めします。これはシンプルで使いやすいです。TCPプロトコルは内部データ構造とより厳密にリンクされており、データブロックを渡すために内部形式を使用し、圧縮データの独自のフレームを使用します。このプロトコルのCライブラリをリリースしていません。これはClickHouseのコードベースのほとんどをリンクする必要があるため、実用的ではありません。 :::

設定

ClickHouseサーバはPOCO C++ライブラリをベースにしており、その構成を表すためにPoco::Util::AbstractConfigurationを使用します。構成はPoco::Util::ServerApplicationクラスによって保持され、これをDaemonBaseクラスが継承し、それがさらにDB::Serverクラスにより継承され、clickhouse-server自体を実装しています。そのため、構成はServerApplication::config()メソッドでアクセスできます。

構成は複数のファイルからXMLまたはYAML形式で読み取られ、ConfigProcessorクラスによって単一のAbstractConfigurationにマージされます。構成はサーバの起動時に読み込まれ、その後、構成ファイルの更新、削除または追加がある場合には再読み込みされます。ConfigReloaderクラスは、これらの変更の定期的な監視と再読み込み手続きを担当します。SYSTEM RELOAD CONFIGクエリも構成を再読み込みさせるトリガーになります。

Server以外のクエリやサブシステムには構成がContext::getConfigRef()メソッドを使ってアクセス可能です。サーバを再起動せずに構成をリロードできるサブシステムは、Server::main()メソッド内のリロードコールバックに登録する必要があります。新しい構成にエラーがある場合、多くのサブシステムは新しい構成を無視し、警告メッセージをログに記録し、以前に読み込まれた構成で動作を続行します。AbstractConfigurationの性質上、特定のセクションへの参照を渡すことはできないため、通常はString config_prefixが使用されます。

スレッドとジョブ

クエリを実行し、副作用を行うためにClickHouseは、スレッドプールからスレッドを割り当て、頻繁なスレッドの作成と破棄を避けます。目的とジョブの構造に応じて選ばれるいくつかのスレッドプールがあります

  • クライアントセッションのためのサーバプール。
  • 一般的なジョブ、バックグラウンド活動、スタンドアロンのスレッドのためのグローバルスレッドプール。
  • 主に何らかのIOでブロックされCPU負荷が少ないジョブのためのIOスレッドプール。
  • 定期的なタスクのためのバックグラウンドプール。
  • ステップに分割できるプリエンプタブルなタスクのためのプール。

サーバプールはServer::main()メソッドで定義されたPoco::ThreadPoolクラスのインスタンスです。最大でmax_connectionスレッドを持つことができます。それぞれのスレッドは1つのアクティブな接続に専念しています。

グローバルスレッドプールはGlobalThreadPoolシングルトンクラスです。これからスレッドを割り当てるにはThreadFromGlobalPoolを使用します。これはstd::threadに似たインターフェースを持っており、グローバルプールからスレッドをプルし、必要な初期化をすべて行います。以下の設定で構成されます:

  • max_thread_pool_size - プール内スレッド数の制限。
  • max_thread_pool_free_size - 新しいジョブを待つアイドルスレッド数の制限。
  • thread_pool_queue_size - スケジュールされたジョブ数の制限。

グローバルプールはユニバーサルであり、以下に説明するすべてのプールはその上に実装されています。これはプールの階層として考えることができます。特定のプールはThreadPoolクラスを使用してグローバルプールからそのスレッドを取得します。そのため、特定のプールの主な目的は、同時ジョブの数に制限を適用し、ジョブのスケジュールを行うことです。スレッドプールのスレッド数より多くのジョブがスケジュールされている場合は、ThreadPoolが優先順位付きのキューにジョブを蓄積します。それぞれのジョブには整数の優先度があります。デフォルトの優先度はゼロです。優先度の値が高いジョブは、優先度の値が低いジョブより先に開始されます。しかし、すでに実行されているジョブの間には違いはなく、そのため優先度は主にプールが過負荷状態であるときに重要です。

IOスレッドプールは、IOThreadPool::get()メソッドでアクセス可能なプレーンなThreadPoolとして実装されています。これは、max_io_thread_pool_sizemax_io_thread_pool_free_size、およびio_thread_pool_queue_size設定と同様に構成されます。IOスレッドプールの主な目的は、グローバルプールをIOジョブで消耗させないようにし、それがクエリのためにCPUを完全に活用できないことを避けることです。S3へのバックアップは多くのIO操作を行うため、インタラクティブなクエリへの影響を避けるために別のBackupsIOThreadPoolがあり、max_backups_io_thread_pool_sizemax_backups_io_thread_pool_free_size、およびbackups_io_thread_pool_queue_size設定で構成されます。

定期的なタスク実行のためにBackgroundSchedulePoolクラスがあります。BackgroundSchedulePool::TaskHolderオブジェクトを使用してタスクを登録することができ、プールは2つのジョブが同時に実行されないことを保証します。タスク実行を特定の将来の瞬間に延期したり、一時的にタスクを非アクティブ化することもできます。グローバルContextはこれを異なる目的でいくつかのインスタンスを提供します。一般的な目的のタスクにはContext::getSchedulePool()が使用されます。

プリエンプタブルなタスクのための特化したスレッドプールもあります。そのようなIExecutableTaskタスクは、ステップと呼ばれるジョブの順序付けられたシーケンスに分割できます。短いタスクが長いタスクより優先されるようにスケジュールするためにMergeTreeBackgroundExecutorが使用されます。名前が示すように、バックグラウンドでのMergeTree関連の操作、例えばマージやミューテーション、フェッチ、ムーブのために使用されます。プールのインスタンスはContext::getCommonExecutor()およびそれに類似するメソッドで使用可能です。

どのプールをジョブに使用するかに関係なく、開始時にこのジョブのためにThreadStatusインスタンスが作成されます。これにはすべてのスレッド固有の情報スレッドID、クエリID、パフォーマンスカウンター、リソース消費量などが含まれます。ジョブはローカルスレッドポインタによってCurrentThread::get()呼び出しでこれにアクセスします。そのため、すべての関数にこれを渡す必要はありません。

スレッドがクエリ実行に関連している場合、ThreadStatusに添付される最も重要なものはクエリコンテキストContextPtrです。すべてのクエリには、サーバプール内のマスタースレッドがあります。マスタースレッドはThreadStatus::QueryScope query_scope(query_context)オブジェクトを保持することでアタッチメンを行います。マスタースレッドはまた、ThreadGroupStatusオブジェクトで表されるスレッドグループを作成します。このクエリ実行中に割り当てられるすべての追加スレッドはCurrentThread::attachTo(thread_group)呼び出しでそのスレッドグループに添付されます。スレッドグループはプロファイルイベントカウンタの集約し、単一タスクに専用された全スレッドによるメモリ消費を追跡するのに使用されます(詳細はMemoryTrackerおよびProfileEvents::Countersクラスを参照してください)。

同時実行制御

並列実行可能なクエリは、max_threads設定を使用して制限されます。この設定のデフォルト値は、単一クエリがすべてのCPUコアを最大限に活用できるように選択されています。しかし、複数の同時実行クエリがあり、それぞれがデフォルトのmax_threads設定値を使用する場合はどうでしょうかその場合、クエリはCPUリソースを共有します。OSは、新規スレッドを頻繁に切り替えることで公平性を保証しますが、これにはそれなりのパフォーマンスペナルティがあります。ConcurrencyControlはこのペナルティを処理し、多くのスレッドの割り当てを避けることに役立ちます。スロットの概念が導入されています。スロットは同時実行性の単位ですスレッドクエリを実行するためには、事前にスロットを取得し、スレッドを停止したらそれを解放する必要があります。スロットの数はサーバで全体に制限されています。合計スロット数を超えた場合には、複数の同時実行クエリがCPUスロットを競争します。ConcurrencyControlは、この競争を公平に解決するためにCPUスロットのスケジューリングを行う責任があります。

それぞれのスロットは、以下の状態を持つ独立した状態機械として見ることができます:

  • free: スロットは任意のクエリが割り当てるために利用可能です。
  • granted: スロットは特定のクエリによってallocatedされますが、まだスレッドに獲得されていません。
  • acquired: スロットは特定のクエリによってallocatedされ、スレッドによって獲得されます。

注:allocatedされたスロットはgrantedacquiredの2つの異なる状態にあることがあります。前者は移行状態であり、実際にはスロットがクエリに割り当てられた瞬間からそのクエリのいずれかのスレッドによってアップスケーリング手続きが実行される瞬間までの短い状態です。

stateDiagram-v2
    direction LR
    [*] --> free
    free --> allocated: allocate
    state allocated {
        direction LR
        [*] --> granted
        granted --> acquired: acquire
        acquired --> [*]
    }
    allocated --> free: release

ConcurrencyControlのAPIは以下の機能で構成されています

  1. クエリのためのリソース割り当てを作成:auto slots = ConcurrencyControl::instance().allocate(1, max_threads);。少なくとも1つのスロットを割り当て、最大でmax_threadsのスロットを割り当てます。最初のスロットは即座に許可されますが、残りのスロットは後で許可される場合があります。そのため限度は柔軟です。なぜなら、すべてのクエリは少なくとも1つのスレッドを取得します。
  2. それぞれのスレッドは割り当てからスロットを獲得する必要があります:while (auto slot = slots->tryAcquire()) spawnThread([slot = std::move(slot)] { ... });
  3. スロットの全体数を更新します:ConcurrencyControl::setMaxConcurrency(concurrent_threads_soft_limit_num)。サーバ再起動せずに実行中に行うことができます。

このAPIにより、クエリは少なくとも1つのスレッドで開始しCPU圧力がある場合、その後max_threadsまでスケールアップできます。

分散クエリ実行

クラスターセットアップでのサーバーは主に独立しています。クラスターの1つまたはすべてのサーバーにDistributedテーブルを作成できます。Distributedテーブルはデータ自体を保存せず、クラスター内の複数ノードのすべてのローカルテーブルへの「ビュー」を提供します。DistributedテーブルからSELECTすると、そのクエリを再書き込みし、負荷分散設定に応じてリモートードを選択し、クエリを送信します。Distributedテーブルは、リモートサーバにクエリを送信し、異なるサーバからの中間結果をマージできる段階まで処理させ、次に中間結果を受け取り、それをマージします。分散テーブルはできるだけ多くの作業をリモートサーバに分配し、ネットワーク上で中間データをあまり送信しません。

INまたはJOIN句内のサブクエリを持ち、それぞれがDistributedテーブルを使用する場合、物事はより複雑になります。これらのクエリの実行には異なる戦略があります。

分散クエリ実行のためのグローバルクエリプランはありません。各ノードはそのタスクのためのローカルクエリプランのみを持っています。私たちは、リモートノードにクエリを送信し、次に結果をマージするシンプルなワンパス分散クエリ実行のみを持っています。しかし、これだけでは、高カーディナリティのGROUP BYを持つ複雑なクエリやJOINのための多量の一時データを持つクエリには実現可能ではありません。そのような場合、サーバー間でデータを「再シャッフル」する必要があり、追加の調整が必要です。ClickHouseはそのようなクエリ実行をサポートしておらず、このための作業が必要です。

MergeTree

MergeTreeは主キーでインデックスを付けることをサポートするストレージエンジンのファミリーです。主キーはカラムまたは式の任意のタプルにすることができます。MergeTreeテーブル内のデータは「パーツ」に保存されます。各パーツは主キー順にデータを保存するため、データは主キータプルにより辞書順に整列されています。すべてのテーブルカラムは、これらのパーツの中の個別のcolumn.binファイルに保存されます。ファイルは圧縮されたブロックで構成されています。各ブロックは、通常、未圧縮データの64 KBから1 MBの範囲で、平均値サイズに依存します。ブロックはカラム値を連続して一つずつ配置されています。カラム値は各カラムで同じ順序にあり主キーが順序を定義しています、多くのカラムを反復するときに対応する行の値が得られます。

主キー自体は「スパース」です。これは、各行を指すのではなく、データのいくつかの範囲だけを指します。primary.idxと呼ばれる別のファイルには、N番目の行ごとに主キーの値が記録されています。ここでNはindex_granularity通常、N = 8192と呼ばれます。また、各カラムには、データファイル内の各N番目の行へのオフセットを持つcolumn.mrkファイルがあります。各マークはペアであり、ファイル内の圧縮ブロックの先頭へのオフセットと、デコンプレッションされたブロックにおけるデータの最初へのオフセットです。通常、圧縮ブロックはマークと整合されており、デコンプレッションされたブロック内のオフセットはゼロです。primary.idxのデータは常にメモリに存在し、column.mrkファイルのデータはキャッシュされます。

MergeTree内のパーツから何かを読み取ろうとすると、primary.idxデータを見て、要求されたデータを含む可能性のある範囲を見つけ、次にcolumn.mrkデータを見て、それらの範囲を読み始める場所のオフセットを計算します。スパースのため、余計なデータが読み取られるかもしれません。ClickHouseは、シンプルなポイントクエリの高負荷に適していません。なぜなら、各キーのためにindex_granularity行を持つ全範囲が読み取られる必要があり、各カラムのために圧縮ブロック全体がデコンプレッションされる必要があるからです。私たちは、単一のサーバで数兆行を管理し、索引のための目立たないメモリ消費を可能にするために索引をスパースにしました。また、主キーがスパースであるため、一意ではありませんテーブルへのINSERT時にはキーが存在するかどうかを確認できません。テーブル内に同じキーを持つ多くの行がある場合があります。

MergeTreeにデータを挿入すると、そのバッチが主キー順に並べ替えられ、新しいパーツを形成します。バックグラウンドスレッドがいくつかのパーツを選び、それらを単一のソートされたパーツに統合する作業を定期的に行い、パーツの数を比較的低く保ちます。これはMergeTreeと呼ばれる理由です。もちろん、マージは「書き込み増幅」につながります。すべてのパーツは不変であり作成され、削除されるだけで修正されません。SELECTが実行されるときは、テーブルのスナップショットパーツのセットが保持されます。マージ後、障害発生後のリカバリーを容易にするために一時的に古いパーツも保持されており、マージされたパーツが壊れていると見なされる場合は、そのソースパーツで置き換えることができます。

MergeTreeはLSMツリーではありません。なぜなら、MEMTABLEおよびLOGを含まないためです。挿入されたデータはファイルシステムに直接書き込みます。この動作により、MergeTreeはバッチでデータを挿入するのにより適します。したがって、頻繁に少量の行を挿入するのはMergeTreeには理想的ではありません。例えば、数行を1秒ごとに挿入することは可能ですが、1秒に千回行うことはMergeTreeに最適ではありません。しかし、小規模な挿入を克服するための非同期挿入モードがあります。私たちは、私たちのアプリケーションでバッチでデータをすでに挿入しているため、簡潔さのためにこの方法を取りました

背景合併中に追加作業を行うMergeTreeエンジンもあります。例としては、CollapsingMergeTreeAggregatingMergeTreeがあります。これらは特別な更新サポートとして扱うことができます。これらは実際には更新ではなく、ユーザーは通常、背景合併が実行される時間を制御できず、MergeTreeテーブル内のデータはほとんどの場合、完全に合併された形式ではなく、複数のパーツで保存されています。

レプリケーション

ClickHouseのレプリケーションはテーブルごとに設定できます。単一サーバでいくつかのレプリケートされたテーブルとレプリケートされていないテーブルを持つことができ、異なる方法でレプリケートされたテーブルも持つことができます。例えば、二重レプリケーションを持つテーブルと三重レプリケーションを持つ別のテーブルです。

レプリケーションは、ReplicatedMergeTreeストレージエンジンで実装されています。ZooKeeper内のパスはストレージエンジンのパラメータとして指定されます。同じZooKeeperパスを持つすべてのテーブルはお互いのレプリカになり、データを同期し、一貫性を保ちます。レプリカはテーブルの作成または削除で動的に追加および削除できます。

レプリケーションは、非同期のマルチマスター方式を使用します。ZooKeeperとのセッションを持つ任意のレプリカにデータを挿入でき、そのデータは他のすべてのレプリカに非同期でレプリケートされます。ClickHouseはUPDATEをサポートしていないため、レプリケーションは競合しません。デフォルトでは、挿入のクォーラムによる確認がないため、新しく挿入されたデータはードの障害が発生した場合に失われる可能性があります。挿入クォーラムは、insert_quorum設定を使用して有効にできます。

レプリケーションのメタデータはZooKeeperに格納されます。実行するアクションのリストレプリケーションログがあります。アクションは、パーツの取得、パーツのマージ、パーティションの削除などです。各レプリカは、レプリケーションログをキューにコピーし、キューからアクションを実行します。例えば、挿入時には「パーツを取得」アクションがログに作成され、すべてのレプリカはそのパーツをダウンロードします。マージは、すべてのレプリカでバイト一致の結果を得るために調整されています。すべてのパーツはすべてのレプリカで同じようにマージされます。リーダーの一つは最初に新しいマージを開始し、「マージパーツ」アクションをログに書き込みます。複数のレプリカまたはすべてが同時にリーダーになることができます。レプリカがリーダーにならないように、merge_tree設定のreplicated_can_become_leaderを使用して防ぐことができます。リーダーは背景合併のスケジュールを担当します。

レプリケーションは物理的です:クエリではなく、圧縮された部分だけがノード間で転送されます。マージは通常、ネットワークコストを削減し、ネットワーク増幅を回避するために各レプリカで独立して処理されます。レプリケーションの遅れが大きい場合に限り、大きくマージされたパーツがネットワーク内で送信されます。

また、各レプリカはその状態をZooKeeperにセットされ、それらのチェックサムも含まれる。ローカルファイルシステムでの状態がZooKeeperでの参照状態と異なる場合、そのレプリカは他のレプリカから欠落したり破壊されたパーツをダウンロードして一貫性を回復します。予期しない、もしくは破損したデータがローカルファイルシステム上にある場合、ClickHouseはそれを削除せず、別のディレクトリに移動し忘れます。

:::note ClickHouseクラスターは独立したシャードで構成されており、それぞれのシャードにはレプリカがあります。クラスターはエラスティックではないため、新しいシャードを追加しても、シャード間でデータが自動的に再分散されません。代わりに、クラスターの負荷が均等に調整されると想定されます。小さなクラスター、例えば10ード程度ではこの実装はコントロールがしやすいですが、プロダクションで使用する数百ードのクラスターに対して、このアプローチは大きな欠点になります。クラスター全体に広がり、自動的にクラスター間で分割しバランスされる動的にレプリケートされるリージョンを持つテーブルエンジンを実装する必要があります。 :::