ClickHouse/programs/diagnostics/internal/collectors/clickhouse/zookeeper.go
2022-07-01 11:43:11 +02:00

154 lines
5.5 KiB
Go

package clickhouse
import (
"fmt"
"strings"
"github.com/ClickHouse/ClickHouse/programs/diagnostics/internal/collectors"
"github.com/ClickHouse/ClickHouse/programs/diagnostics/internal/platform"
"github.com/ClickHouse/ClickHouse/programs/diagnostics/internal/platform/config"
"github.com/ClickHouse/ClickHouse/programs/diagnostics/internal/platform/data"
"github.com/bmatcuk/doublestar/v4"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
)
// This collector collects the system zookeeper db
type ZookeeperCollector struct {
resourceManager *platform.ResourceManager
}
func NewZookeeperCollector(m *platform.ResourceManager) *ZookeeperCollector {
return &ZookeeperCollector{
resourceManager: m,
}
}
func (zkc *ZookeeperCollector) Collect(conf config.Configuration) (*data.DiagnosticBundle, error) {
conf, err := conf.ValidateConfig(zkc.Configuration())
if err != nil {
return &data.DiagnosticBundle{}, err
}
pathPattern, err := config.ReadStringValue(conf, "path_pattern")
if err != nil {
return &data.DiagnosticBundle{}, err
}
defaultPattern, _ := zkc.Configuration().GetConfigParam("path_pattern")
if defaultPattern.(config.StringParam).Value != pathPattern {
log.Warn().Msgf("Using non default zookeeper glob pattern [%s] - this can potentially cause high query load", pathPattern)
}
maxDepth, err := config.ReadIntValue(conf, "max_depth")
if err != nil {
return &data.DiagnosticBundle{}, err
}
rowLimit, err := config.ReadIntValue(conf, "row_limit")
if err != nil {
return &data.DiagnosticBundle{}, err
}
// we use doublestar for globs as it provides us with ** but also allows us to identify prefix or base paths
if !doublestar.ValidatePattern(pathPattern) {
return &data.DiagnosticBundle{}, errors.Wrapf(err, "%s is not a valid pattern", pathPattern)
}
base, _ := doublestar.SplitPattern(pathPattern)
frames := make(map[string]data.Frame)
hFrame, frameErrors := zkc.collectSubFrames(base, pathPattern, rowLimit, 0, maxDepth)
fErrors := data.FrameErrors{
Errors: frameErrors,
}
frames["zookeeper_db"] = hFrame
return &data.DiagnosticBundle{
Frames: frames,
Errors: fErrors,
}, nil
}
// recursively iterates over the zookeeper sub tables to a max depth, applying the filter and max rows per table
func (zkc *ZookeeperCollector) collectSubFrames(path, pathPattern string, rowLimit, currentDepth, maxDepth int64) (data.HierarchicalFrame, []error) {
var frameErrors []error
var subFrames []data.HierarchicalFrame
currentDepth += 1
if currentDepth == maxDepth {
return data.HierarchicalFrame{}, frameErrors
}
match, err := doublestar.PathMatch(pathPattern, path)
if err != nil {
frameErrors = append(frameErrors, errors.Wrapf(err, "Path match failed for pattern %s with path %s", pathPattern, path))
return data.HierarchicalFrame{}, frameErrors
}
// we allow a single level to be examined or we never get going
if !match && currentDepth > 1 {
return data.HierarchicalFrame{}, frameErrors
}
frame, err := zkc.resourceManager.DbClient.ExecuteStatement(path, fmt.Sprintf("SELECT name FROM system.zookeeper WHERE path='%s' LIMIT %d", path, rowLimit))
if err != nil {
frameErrors = append(frameErrors, errors.Wrapf(err, "Unable to read zookeeper table path for sub paths %s", path))
return data.HierarchicalFrame{}, frameErrors
}
// this isn't ideal, we add re-execute the query to our collection as this will be consumed by the output lazily
outputFrame, err := zkc.resourceManager.DbClient.ExecuteStatement(path, fmt.Sprintf("SELECT * FROM system.zookeeper WHERE path='%s' LIMIT %d", path, rowLimit))
if err != nil {
frameErrors = append(frameErrors, errors.Wrapf(err, "Unable to read zookeeper table path %s", path))
return data.HierarchicalFrame{}, frameErrors
}
frameComponents := strings.Split(path, "/")
frameId := frameComponents[len(frameComponents)-1]
for {
values, ok, err := frame.Next()
if err != nil {
frameErrors = append(frameErrors, errors.Wrapf(err, "unable to read frame %s", frame.Name()))
return data.NewHierarchicalFrame(frameId, outputFrame, subFrames), frameErrors
}
if !ok {
return data.NewHierarchicalFrame(frameId, outputFrame, subFrames), frameErrors
}
subName := fmt.Sprintf("%v", values[0])
subPath := fmt.Sprintf("%s/%s", path, subName)
subFrame, errs := zkc.collectSubFrames(subPath, pathPattern, rowLimit, currentDepth, maxDepth)
if subFrame.Name() != "" {
subFrames = append(subFrames, subFrame)
}
frameErrors = append(frameErrors, errs...)
}
}
func (zkc *ZookeeperCollector) Configuration() config.Configuration {
return config.Configuration{
Params: []config.ConfigParam{
config.StringParam{
Value: "/clickhouse/{task_queue}/**",
Param: config.NewParam("path_pattern", "Glob pattern for zookeeper path matching. Change with caution.", false),
},
config.IntParam{
Value: 8,
Param: config.NewParam("max_depth", "Max depth for zookeeper navigation.", false),
},
config.IntParam{
Value: 10,
Param: config.NewParam("row_limit", "Maximum number of rows/sub nodes to collect/expand from any zookeeper leaf. Negative values mean unlimited.", false),
},
},
}
}
func (zkc *ZookeeperCollector) IsDefault() bool {
return false
}
func (zkc *ZookeeperCollector) Description() string {
return "Collects Zookeeper information available within ClickHouse."
}
// here we register the collector for use
func init() {
collectors.Register("zookeeper_db", func() (collectors.Collector, error) {
return &ZookeeperCollector{
resourceManager: platform.GetResourceManager(),
}, nil
})
}