温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

go监控方案的实现是怎样的

发布时间:2021-11-15 14:21:25 来源:亿速云 阅读:169 作者:柒染 栏目:大数据

这期内容当中小编将会给大家带来有关go监控方案的实现是怎样的,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。

metrics 客户端

数据采集使用go-metrics

传输使用UDP, 仿StatsD上传采集数据, InfluxDB进行数据存储, Grafana进行展示。

实现github 地址

https://github.com/solate/metrics

该地址有已经改好的配置文件可以直接使用。

使用的all-in-one :

git docker-statsd-influxdb-grafana

docker hub 地址

数据封装

//挂载配置文件,已修改statsd模版
docker run --ulimit nofile=66000:66000  -v /root/telegraf.conf:/etc/telegraf/telegraf.conf   -d   --name docker-statsd-influxdb-grafana   -p 3003:3003   -p 3004:8888   -p 8086:8086   -p 8125:8125/udp   samuelebistoletti/docker-statsd-influxdb-grafana:latest

register

register 使用的name 必须是不同的

telegraf 配置修改

[[inputs.statsd]] 部分配置打开, 修改templates为:

   templates = [
      "* measurement.measurement.field"
   ]

表示传值prefix.name.field 最好表示为 prefix_name field

代码实现

package client

import (
	"bufio"
	"bytes"
	"github.com/rcrowley/go-metrics"
	"log"
	"net"
	"strconv"
	"strings"
	"time"
)

const (
	// UDP packet limit, see
	// https://en.wikipedia.org/wiki/User_Datagram_Protocol#Packet_structure
	UDP_MAX_PACKET_SIZE int = 64 * 1024
)

// Config provides a container with configuration parameters for
// the StatsD exporter
type Config struct {
	Network       string           // Network: tcp, udp.
	Addr          string           // Network address to connect to | 地址
	Registry      metrics.Registry // Registry to be exported | metrics注册
	FlushInterval time.Duration    // Flush interval | 刷新间隔时间
	Prefix        string           // Prefix to be prepended to metric names | 前缀名字
	Rate          float32          // Rate
	Tags          string           // tag //TODO

	conn net.Conn
}

func StatsD(r metrics.Registry, d time.Duration, prefix string, network string, addr string, rate float32) {

	conn, err := net.Dial(network, addr)
	if err != nil {
		panic("conn remote err!")
	}

	StatsDWithConfig(Config{
		Network:       network,
		Addr:          addr,
		Registry:      r,
		FlushInterval: d,
		Prefix:        prefix,
		Rate:          rate,
		conn:          conn,
	})

}

// WithConfig is a blocking exporter function
func StatsDWithConfig(c Config) {
	for _ = range time.Tick(c.FlushInterval) {
		if err := statsd(&c); err != nil {
			log.Println(err)
			c.conn.Close()
		}
	}
}

func statsd(c *Config) (err error) {

	w := bufio.NewWriter(c.conn)

	c.Registry.Each(func(name string, i interface{}) {
		switch metric := i.(type) {
		case metrics.Counter:
			ms := metric.Snapshot()
			w.Write(statsdLine(c.Prefix, name, "", ms.Count(), "|c", c.Rate))
		case metrics.Gauge:
			ms := metric.Snapshot()
			w.Write(statsdLine(c.Prefix, name, "", ms.Value(), "|g", c.Rate))
		case metrics.GaugeFloat64:
			ms := metric.Snapshot()
			w.Write(statsdLine(c.Prefix, name, "", ms.Value(), "|g", c.Rate))
		case metrics.Histogram:
			ms := metric.Snapshot()
			ps := ms.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999, 0.9999})

			fields := make([][]byte, 12)
			fields = append(fields, statsdLine(c.Prefix, name, "count", ms.Count(), "|g", c.Rate))
			fields = append(fields, statsdLine(c.Prefix, name, "max", ms.Max(), "|g", c.Rate))
			fields = append(fields, statsdLine(c.Prefix, name, "mean", ms.Mean(), "|g", c.Rate))
			fields = append(fields, statsdLine(c.Prefix, name, "min", ms.Min(), "|g", c.Rate))
			fields = append(fields, statsdLine(c.Prefix, name, "stddev", ms.StdDev(), "|g", c.Rate))
			fields = append(fields, statsdLine(c.Prefix, name, "variance", ms.Variance(), "|g", c.Rate))
			fields = append(fields, statsdLine(c.Prefix, name, "p50", ps[0], "|g", c.Rate))
			fields = append(fields, statsdLine(c.Prefix, name, "p75", ps[1], "|g", c.Rate))
			fields = append(fields, statsdLine(c.Prefix, name, "p95", ps[2], "|g", c.Rate))
			fields = append(fields, statsdLine(c.Prefix, name, "p99", ps[3], "|g", c.Rate))
			fields = append(fields, statsdLine(c.Prefix, name, "p999", ps[4], "|g", c.Rate))
			fields = append(fields, statsdLine(c.Prefix, name, "p9999", ps[5], "|g", c.Rate))

			buf := bytes.Join(fields, []byte{})
			w.Write(buf)
		case metrics.Meter:
			ms := metric.Snapshot()
			fields := make([][]byte, 5)
			fields = append(fields, statsdLine(c.Prefix, name, "count", ms.Count(), "|g", c.Rate))
			fields = append(fields, statsdLine(c.Prefix, name, "m1", ms.Rate1(), "|g", c.Rate))
			fields = append(fields, statsdLine(c.Prefix, name, "m5", ms.Rate5(), "|g", c.Rate))
			fields = append(fields, statsdLine(c.Prefix, name, "m15", ms.Rate15(), "|g", c.Rate))
			fields = append(fields, statsdLine(c.Prefix, name, "mean", ms.RateMean(), "|g", c.Rate))

			buf := bytes.Join(fields, []byte{})
			w.Write(buf)

		case metrics.Timer:
			ms := metric.Snapshot()
			ps := ms.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999, 0.9999})

			fields := make([][]byte, 12)
			fields = append(fields, statsdLine(c.Prefix, name, "count", ms.Count(), "|g", c.Rate))
			fields = append(fields, statsdLine(c.Prefix, name, "max", ms.Max(), "|g", c.Rate))
			fields = append(fields, statsdLine(c.Prefix, name, "mean", ms.Mean(), "|g", c.Rate))
			fields = append(fields, statsdLine(c.Prefix, name, "min", ms.Min(), "|g", c.Rate))
			fields = append(fields, statsdLine(c.Prefix, name, "stddev", ms.StdDev(), "|g", c.Rate))
			fields = append(fields, statsdLine(c.Prefix, name, "variance", ms.Variance(), "|g", c.Rate))
			fields = append(fields, statsdLine(c.Prefix, name, "p50", ps[0], "|g", c.Rate))
			fields = append(fields, statsdLine(c.Prefix, name, "p75", ps[1], "|g", c.Rate))
			fields = append(fields, statsdLine(c.Prefix, name, "p95", ps[2], "|g", c.Rate))
			fields = append(fields, statsdLine(c.Prefix, name, "p99", ps[3], "|g", c.Rate))
			fields = append(fields, statsdLine(c.Prefix, name, "p999", ps[4], "|g", c.Rate))
			fields = append(fields, statsdLine(c.Prefix, name, "p9999", ps[5], "|g", c.Rate))
			fields = append(fields, statsdLine(c.Prefix, name, "m1", ms.Rate1(), "|g", c.Rate))
			fields = append(fields, statsdLine(c.Prefix, name, "m5", ms.Rate5(), "|g", c.Rate))
			fields = append(fields, statsdLine(c.Prefix, name, "m15", ms.Rate15(), "|g", c.Rate))
			fields = append(fields, statsdLine(c.Prefix, name, "mean", ms.RateMean(), "|g", c.Rate))

			buf := bytes.Join(fields, []byte{})

			w.Write(buf)

			//case metrics.Healthcheck:
			//	metric.Check()
			//	log.Printf("healthcheck %s\n", name)
			//	log.Printf("  error:       %v\n", metric.Error())
			//
			//case metrics.EWMA:
			//case metrics.Sample:

		}

		w.Flush()
	})

	return
}

//构造发送line
func statsdLine(prefix, name, field string, value interface{}, suffix string, rate float32) []byte {

	//<metricname>:<value>|<type>|@<rate>
	var buffer bytes.Buffer

	//buf := make([]byte, UDP_MAX_PACKET_SIZE)

	//添加前缀
	if prefix != "" {
		//buf = append(buf, prefix...)
		//buf = append(buf, '.')
		buffer.WriteString(prefix)
		buffer.WriteString(".")
	} else {
		buffer.WriteString("statsd")
		buffer.WriteString(".")
	}

	////添加名称
	//buf = append(buf, name...)
	//buf = append(buf, ':')

	//将name注册中的'.'替换成'_', 配合telegraf修改模版,防止将数据库名字改为属性
	if strings.Contains(name, ".") {
		name = strings.ReplaceAll(name, ".", "_")
	}
	//添加名称
	buffer.WriteString(name)

	if field != "" {
		buffer.WriteString(".")
		buffer.WriteString(field)
	}
	buffer.WriteString(":")

	buf := buffer.Bytes()

	switch v := value.(type) {
	case string:
		buf = append(buf, v...)
	case int64:
		buf = strconv.AppendInt(buf, v, 10)
	case float64:
		buf = strconv.AppendFloat(buf, v, 'f', -1, 64)
	default:
		return nil
	}

	if suffix != "" {
		buf = append(buf, suffix...)
	}

	if rate != 0 && rate < 1 {
		buf = append(buf, "|@"...)
		buf = strconv.AppendFloat(buf, float64(rate), 'f', 6, 32)
	}

	buf = append(buf, "\n"...) //每一行打一个回车,telegraf 是使用回车进行读取的
	return buf

}

上述就是小编为大家分享的go监控方案的实现是怎样的了,如果刚好有类似的疑惑,不妨参照上述分析进行理解。如果想知道更多相关知识,欢迎关注亿速云行业资讯频道。

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

go
AI