mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-15 19:02:04 +00:00
154 lines
5.5 KiB
Go
154 lines
5.5 KiB
Go
package clickhouse
|
|
|
|
import (
|
|
"fmt"
|
|
"strings"
|
|
|
|
"github.com/ClickHouse/ClickHouse/programs/diagnostics/internal/collectors"
|
|
"github.com/ClickHouse/ClickHouse/programs/diagnostics/internal/platform"
|
|
"github.com/ClickHouse/ClickHouse/programs/diagnostics/internal/platform/config"
|
|
"github.com/ClickHouse/ClickHouse/programs/diagnostics/internal/platform/data"
|
|
"github.com/bmatcuk/doublestar/v4"
|
|
"github.com/pkg/errors"
|
|
"github.com/rs/zerolog/log"
|
|
)
|
|
|
|
// This collector collects the system zookeeper db
|
|
|
|
type ZookeeperCollector struct {
|
|
resourceManager *platform.ResourceManager
|
|
}
|
|
|
|
func NewZookeeperCollector(m *platform.ResourceManager) *ZookeeperCollector {
|
|
return &ZookeeperCollector{
|
|
resourceManager: m,
|
|
}
|
|
}
|
|
|
|
func (zkc *ZookeeperCollector) Collect(conf config.Configuration) (*data.DiagnosticBundle, error) {
|
|
conf, err := conf.ValidateConfig(zkc.Configuration())
|
|
if err != nil {
|
|
return &data.DiagnosticBundle{}, err
|
|
}
|
|
|
|
pathPattern, err := config.ReadStringValue(conf, "path_pattern")
|
|
if err != nil {
|
|
return &data.DiagnosticBundle{}, err
|
|
}
|
|
defaultPattern, _ := zkc.Configuration().GetConfigParam("path_pattern")
|
|
if defaultPattern.(config.StringParam).Value != pathPattern {
|
|
log.Warn().Msgf("Using non default zookeeper glob pattern [%s] - this can potentially cause high query load", pathPattern)
|
|
}
|
|
maxDepth, err := config.ReadIntValue(conf, "max_depth")
|
|
if err != nil {
|
|
return &data.DiagnosticBundle{}, err
|
|
}
|
|
rowLimit, err := config.ReadIntValue(conf, "row_limit")
|
|
if err != nil {
|
|
return &data.DiagnosticBundle{}, err
|
|
}
|
|
// we use doublestar for globs as it provides us with ** but also allows us to identify prefix or base paths
|
|
if !doublestar.ValidatePattern(pathPattern) {
|
|
return &data.DiagnosticBundle{}, errors.Wrapf(err, "%s is not a valid pattern", pathPattern)
|
|
}
|
|
base, _ := doublestar.SplitPattern(pathPattern)
|
|
frames := make(map[string]data.Frame)
|
|
hFrame, frameErrors := zkc.collectSubFrames(base, pathPattern, rowLimit, 0, maxDepth)
|
|
fErrors := data.FrameErrors{
|
|
Errors: frameErrors,
|
|
}
|
|
frames["zookeeper_db"] = hFrame
|
|
return &data.DiagnosticBundle{
|
|
Frames: frames,
|
|
Errors: fErrors,
|
|
}, nil
|
|
}
|
|
|
|
// recursively iterates over the zookeeper sub tables to a max depth, applying the filter and max rows per table
|
|
func (zkc *ZookeeperCollector) collectSubFrames(path, pathPattern string, rowLimit, currentDepth, maxDepth int64) (data.HierarchicalFrame, []error) {
|
|
var frameErrors []error
|
|
var subFrames []data.HierarchicalFrame
|
|
|
|
currentDepth += 1
|
|
if currentDepth == maxDepth {
|
|
return data.HierarchicalFrame{}, frameErrors
|
|
}
|
|
match, err := doublestar.PathMatch(pathPattern, path)
|
|
if err != nil {
|
|
frameErrors = append(frameErrors, errors.Wrapf(err, "Path match failed for pattern %s with path %s", pathPattern, path))
|
|
return data.HierarchicalFrame{}, frameErrors
|
|
}
|
|
// we allow a single level to be examined or we never get going
|
|
if !match && currentDepth > 1 {
|
|
return data.HierarchicalFrame{}, frameErrors
|
|
}
|
|
frame, err := zkc.resourceManager.DbClient.ExecuteStatement(path, fmt.Sprintf("SELECT name FROM system.zookeeper WHERE path='%s' LIMIT %d", path, rowLimit))
|
|
if err != nil {
|
|
frameErrors = append(frameErrors, errors.Wrapf(err, "Unable to read zookeeper table path for sub paths %s", path))
|
|
return data.HierarchicalFrame{}, frameErrors
|
|
}
|
|
|
|
// this isn't ideal, we add re-execute the query to our collection as this will be consumed by the output lazily
|
|
outputFrame, err := zkc.resourceManager.DbClient.ExecuteStatement(path, fmt.Sprintf("SELECT * FROM system.zookeeper WHERE path='%s' LIMIT %d", path, rowLimit))
|
|
if err != nil {
|
|
frameErrors = append(frameErrors, errors.Wrapf(err, "Unable to read zookeeper table path %s", path))
|
|
return data.HierarchicalFrame{}, frameErrors
|
|
}
|
|
frameComponents := strings.Split(path, "/")
|
|
frameId := frameComponents[len(frameComponents)-1]
|
|
|
|
for {
|
|
values, ok, err := frame.Next()
|
|
if err != nil {
|
|
frameErrors = append(frameErrors, errors.Wrapf(err, "unable to read frame %s", frame.Name()))
|
|
return data.NewHierarchicalFrame(frameId, outputFrame, subFrames), frameErrors
|
|
}
|
|
if !ok {
|
|
return data.NewHierarchicalFrame(frameId, outputFrame, subFrames), frameErrors
|
|
}
|
|
subName := fmt.Sprintf("%v", values[0])
|
|
subPath := fmt.Sprintf("%s/%s", path, subName)
|
|
subFrame, errs := zkc.collectSubFrames(subPath, pathPattern, rowLimit, currentDepth, maxDepth)
|
|
if subFrame.Name() != "" {
|
|
subFrames = append(subFrames, subFrame)
|
|
}
|
|
frameErrors = append(frameErrors, errs...)
|
|
}
|
|
}
|
|
|
|
func (zkc *ZookeeperCollector) Configuration() config.Configuration {
|
|
return config.Configuration{
|
|
Params: []config.ConfigParam{
|
|
config.StringParam{
|
|
Value: "/clickhouse/{task_queue}/**",
|
|
Param: config.NewParam("path_pattern", "Glob pattern for zookeeper path matching. Change with caution.", false),
|
|
},
|
|
config.IntParam{
|
|
Value: 8,
|
|
Param: config.NewParam("max_depth", "Max depth for zookeeper navigation.", false),
|
|
},
|
|
config.IntParam{
|
|
Value: 10,
|
|
Param: config.NewParam("row_limit", "Maximum number of rows/sub nodes to collect/expand from any zookeeper leaf. Negative values mean unlimited.", false),
|
|
},
|
|
},
|
|
}
|
|
}
|
|
|
|
func (zkc *ZookeeperCollector) IsDefault() bool {
|
|
return false
|
|
}
|
|
|
|
func (zkc *ZookeeperCollector) Description() string {
|
|
return "Collects Zookeeper information available within ClickHouse."
|
|
}
|
|
|
|
// here we register the collector for use
|
|
func init() {
|
|
collectors.Register("zookeeper_db", func() (collectors.Collector, error) {
|
|
return &ZookeeperCollector{
|
|
resourceManager: platform.GetResourceManager(),
|
|
}, nil
|
|
})
|
|
}
|