博客 / 詳情

返回

n9e告警可高用的實現機制分析

n9e=nightingale

n9e監控告警框架,提供了監控繪圖、監控告警以及通知等一體的監控運維體系,在雲原生時代,可以認為是Open-falcon的升級版。

一. 告警的數據流

指標存儲:使用push模式

  • categraf採集後push給n9e-server;
  • n9e-server將指標值push給時序庫prometheus;

指標告警:使用pull模式

  • 由n9e-server使用PromQL向prometheus進行查詢;
  • 若查詢到結果,表示告警被觸發;

image.png

二. 告警實例的高可用

告警配置被存儲在MySQL中,n9e-server從MySQL中讀出,然後查詢其中的PromQL,判定告警是否被觸發。

1. 問題

類似於prometheus,當部署多個n9e-server實例後,若每個n9e-server都進行上面的告警判定過程,會導致每個n9e-server上都有告警產生。

prometheus是通過alertmanager進行告警去重的,那n9e-server是如何實現的呢?

2. 解決

n9e-server通過heartbeat+一致性hash,對告警規則進行分片,每個n9e-server只負責自己實例的告警規定的判定。

  • 當n9e-server有實例加入時,已有的n9e-server實例會進行rehash,讓新節點負責一部分告警規則;
  • 當n9e-server有實例退出時,所有的n9e-server實例會進行rehash,退出節點的告警規則被分給存活的節點;

Heartbeat:

  • n9e-server實例啓動一個heartbeat線程,定期的上報自己的狀態並存儲在MySQL中;
  • 若發現進程內保存的實例列表MySQL的實例列表不同,説明發生了節點變化,則進行rehash;

一致性hash:

  • 對某個datasource,比如prometheus-a1,有N個n9e-server註冊進來,則將N個n9e-server組成一致性hash環;
  • 對某個告警規則,使用rule.id進行一致性hash,命中到哪個n9e-server實例,就由其進行告警判定;

image.png

三. 源碼分析

實現代碼主要在alert/naming/*.go。

在做告警判定時,對每個告警規則:

  • 若rule不歸本節點負責,則跳過;
// alert/eval/alert_rule.go
func (s *Scheduler) syncAlertRules() {
    ids := s.alertRuleCache.GetRuleIds()
    alertRuleWorkers := make(map[string]*AlertRuleWorker)
    externalRuleWorkers := make(map[string]*process.Processor)
    for _, id := range ids {
        rule := s.alertRuleCache.Get(id)
        if rule == nil {
            continue
        }
        if rule.IsPrometheusRule() {
            datasourceIds := s.promClients.Hit(rule.DatasourceIdsJson)
            for _, dsId := range datasourceIds {
                // 判定rule是否歸本節點負責
                if !naming.DatasourceHashRing.IsHit(dsId, fmt.Sprintf("%d", rule.Id), s.aconf.Heartbeat.Endpoint) {
                    continue
                }
                ...
            }
        }
        ...
    }
    ...
}

1. 一致性hash

一致性hash的key=rule.id,根據key判斷該rule是否歸本節點負責:

// alert/naming/hashring.go
func (chr *DatasourceHashRingType) IsHit(datasourceId int64, pk string, currentNode string) bool {
    node, err := chr.GetNode(datasourceId, pk)
    ...
    return node == currentNode
}

根據key確定節點的過程:

  • 對每個datasource,都有一個hash環:
  • 先找到datasource對應的hash環;
  • 再在hash環中找負責的節點;
// alert/naming/hashring.go
func (chr *DatasourceHashRingType) GetNode(datasourceId int64, pk string) (string, error) {
    chr.Lock()
    defer chr.Unlock()
    _, exists := chr.Rings[datasourceId]
    if !exists {
        ...
    }
    return chr.Rings[datasourceId].Get(pk)
}

hash環的構造在heartbeat中。

2. heartbeat

n9e-server中開啓一個goroutine,專門進行heartbeat:

  • 默認1s執行1次;
// alert/naming/heartbeat.go
func (n *Naming) loopHeartbeat() {
    interval := time.Duration(n.heartbeatConfig.Interval) * time.Millisecond
    for {
        time.Sleep(interval)
        if err := n.heartbeat(); err != nil {
            logger.Warning(err)
        }
    }
}

heartbeat的過程:

  • 對每個datasource,查找其關聯的最新的n9e-server列表=newss;
  • 跟緩存的oldss對比,若有變化,則對server進行rehash,重新構造hash環;
// alert/naming/heartbeat.go
func (n *Naming) heartbeat() error {
    // 在頁面上維護實例和集羣的對應關係
    datasourceIds, err = models.GetDatasourceIdsByEngineName(n.ctx, n.heartbeatConfig.EngineName)

    for i := 0; i < len(datasourceIds); i++ {
        servers, err := n.ActiveServers(datasourceIds[i])
        ...
        sort.Strings(servers)
        newss := strings.Join(servers, " ")

        oldss, exists := localss[datasourceIds[i]]
        if exists && oldss == newss {
            continue            // 沒有變化
        }

        RebuildConsistentHashRing(datasourceIds[i], servers)    // rehash
        localss[datasourceIds[i]] = newss
    }
    return nil
}

查詢datasource關聯的最新n9e-server的過程:

  • 實際上就查詢alerting_engines表,找目標datasource對應的記錄,要求30s內有心跳;
// alert/naming/heartbeat.go
func (n *Naming) ActiveServers(datasourceId int64) ([]string, error) {
    ...
    // 30秒內有心跳,就認為是活的
    return models.AlertingEngineGetsInstances(n.ctx, "datasource_id = ? and clock > ?", datasourceId, time.Now().Unix()-30)
}

func AlertingEngineGetsInstances(ctx *ctx.Context, where string, args ...interface{}) ([]string, error) {
    var arr []string
    var err error
    session := DB(ctx).Model(new(AlertingEngines)).Order("instance")
    if where == "" {
        err = session.Pluck("instance", &arr).Error
    } else {
        err = session.Where(where, args...).Pluck("instance", &arr).Error
    }
    return arr, err
}
mysql> select * from alerting_engines;
+----+----------------------+---------------+----------------+------------+
| id | instance             | datasource_id | engine_cluster | clock      |
+----+----------------------+---------------+----------------+------------+
|  2 | 192.168.100.20:17000 |      99999999 | default        | 1690537360 |
|  3 | 192.168.100.20:17000 |             1 | default        | 1690537360 |
|  4 | 192.168.100.20:17000 |             2 | default        | 1690537360 |
+----+----------------------+---------------+----------------+------------+
3 rows in set (0.00 sec)

n9e-server實例,在heartbeat的過程中,會將自己註冊進去:

// alert/naming/heartbeat.go
func (n *Naming) heartbeat() error {
    ...
    for i := 0; i < len(datasourceIds); i++ {
        err := models.AlertingEngineHeartbeatWithCluster(n.ctx, n.heartbeatConfig.Endpoint, n.heartbeatConfig.EngineName, datasourceIds[i])
        if err != nil {
            logger.Warningf("heartbeat with cluster %d err:%v", datasourceIds[i], err)
        }
    }
    ...    
}

func AlertingEngineHeartbeatWithCluster(ctx *ctx.Context, instance, cluster string, datasourceId int64) error {
    ...
    var total int64
    err := DB(ctx).Model(new(AlertingEngines)).Where("instance=? and engine_cluster = ? and datasource_id=?", instance, cluster, datasourceId).Count(&total).Error
    ...
    // 沒有記錄,則insert
    if total == 0 {
        // insert
        err = DB(ctx).Create(&AlertingEngines{
            Instance:      instance,
            DatasourceId:  datasourceId,
            EngineCluster: cluster,
            Clock:         time.Now().Unix(),
        }).Error
    } else { // 否則,update最新時間
        // updates
        fields := map[string]interface{}{"clock": time.Now().Unix()}
        err = DB(ctx).Model(new(AlertingEngines)).Where("instance=? and engine_cluster = ? and datasource_id=?", instance, cluster, datasourceId).Updates(fields).Error
    }
    return err
}

參考:

  1. https://flashcat.cloud/docs/content/flashcat-monitor/nighting...
  2. https://answer.flashcat.cloud/questions/10010000000002963/100...
user avatar
0 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.