动态

详情 返回 返回

kubernetes indexer源碼解析 - 动态 详情

kubernetes indexer源碼解析

kubernetes indexer是實現了多索引的本地緩存

1.背景

以db中學校學生表做本地緩存為例

type Student struct {
    ID uint32
    Name string
    Class uint32
}

// 本地緩存,id和學生的映射
var idCache = map[uint32]Student{}

當需要以學生名來取值時,此時沒有合適的緩存,可以再添加一份

// 本地緩存,名稱和學生的映射
var idCache = map[string]Student{}

但這樣緩存就存了兩份,浪費了內存。可以維護一個學生名和id的索引,在拖過id的緩存取值即可

// 索引,名稱和id的映射
var index = map[string]id{}

這樣要通過名稱查緩存,通過idCache[index[name]]即可。kubernetes indexer就是類似的思路,indexer支持任意類型,線程安全

2.使用

參考TestMultiIndexKeys單側

// 取pod中標籤foo的值作為索引值
func testIndexFunc(obj interface{}) ([]string, error) {
    pod := obj.(*v1.Pod)
    return []string{pod.Labels["foo"]}, nil
}

func TestMultiIndexKeys(t *testing.T) {
    // 創建索引器
    index := NewIndexer(MetaNamespaceKeyFunc, Indexers{"byUser": testUsersIndexFunc})

    pod1 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "one", Annotations: map[string]string{"users": "ernie,bert"}}}
    pod2 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "two", Annotations: map[string]string{"users": "bert,oscar"}}}
    pod3 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "tre", Annotations: map[string]string{"users": "ernie,elmo"}}}
    // 添加索引
    index.Add(pod1)
    index.Add(pod2)
    index.Add(pod3)

    expected := map[string]sets.String{}
    expected["ernie"] = sets.NewString("one", "tre")
    expected["bert"] = sets.NewString("one", "two")
    expected["elmo"] = sets.NewString("tre")
    expected["oscar"] = sets.NewString("two")
    expected["elmo1"] = sets.NewString()
    {
        for k, v := range expected {
            found := sets.String{}
            // 根據索引名+緩存key獲取索引結果
            indexResults, err := index.ByIndex("byUser", k)
            if err != nil {
                t.Errorf("Unexpected error %v", err)
            }
            for _, item := range indexResults {
                found.Insert(item.(*v1.Pod).Name)
            }
            if !found.Equal(v) {
                t.Errorf("missing items, index %s, expected %v but found %v", k, v.List(), found.List())
            }
        }
    }
    // 驗證刪除索引
    index.Delete(pod3)
    erniePods, err := index.ByIndex("byUser", "ernie")
    if err != nil {
        t.Errorf("unexpected error: %v", err)
    }
    if len(erniePods) != 1 {
        t.Errorf("Expected 1 pods but got %v", len(erniePods))
    }
    for _, erniePod := range erniePods {
        if erniePod.(*v1.Pod).Name != "one" {
            t.Errorf("Expected only 'one' but got %s", erniePod.(*v1.Pod).Name)
        }
    }

    elmoPods, err := index.ByIndex("byUser", "elmo")
    if err != nil {
        t.Errorf("unexpected error: %v", err)
    }
    if len(elmoPods) != 0 {
        t.Errorf("Expected 0 pods but got %v", len(elmoPods))
    }

    copyOfPod2 := pod2.DeepCopy()
    copyOfPod2.Annotations["users"] = "oscar"
    // 驗證更新索引
    index.Update(copyOfPod2)
    bertPods, err := index.ByIndex("byUser", "bert")
    if err != nil {
        t.Errorf("unexpected error: %v", err)
    }
    if len(bertPods) != 1 {
        t.Errorf("Expected 1 pods but got %v", len(bertPods))
    }
    for _, bertPod := range bertPods {
        if bertPod.(*v1.Pod).Name != "one" {
            t.Errorf("Expected only 'one' but got %s", bertPod.(*v1.Pod).Name)
        }
    }
}

3.源碼解析

func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer {
    return &cache{
        // 線程安全的存儲
        cacheStorage: NewThreadSafeStore(indexers, Indices{}),
        // 緩存key計算函數
        keyFunc:      keyFunc,
    }
}
// 線程安全的存儲
func NewThreadSafeStore(indexers Indexers, indices Indices) ThreadSafeStore {
    return &threadSafeMap{
        // 緩存
        items: map[string]interface{}{},
        index: &storeIndex{
            // Indexers是map[string]IndexFunc類型,保存索引名與索引值計算函數的映射
            indexers: indexers,
            // Indices是map[string]Index類型,通過索引名找到Index,在通過索引值找到緩存key
            indices:  indices,
        },
    }
}
func (c *cache) Add(obj interface{}) error {
    // 計算緩存key
    key, err := c.keyFunc(obj)
    if err != nil {
        return KeyError{obj, err}
    }
    c.cacheStorage.Add(key, obj)
    return nil
}
func (c *threadSafeMap) Add(key string, obj interface{}) {
    c.Update(key, obj)
}

func (c *threadSafeMap) Update(key string, obj interface{}) {
    // 更新操作,加鎖
    c.lock.Lock()
    defer c.lock.Unlock()
    // 獲取舊緩存對象
    oldObject := c.items[key]
    // 設置新緩存對象
    c.items[key] = obj
    // 更新索引
    c.index.updateIndices(oldObject, obj, key)
}
func (i *storeIndex) updateIndices(oldObj interface{}, newObj interface{}, key string) {
    var oldIndexValues, indexValues []string
    var err error
    // 遍歷所有索引器
    for name, indexFunc := range i.indexers {
        // 在緩存對象第一次添加的時候,oldObj為nil
        if oldObj != nil {
            // 計算舊的索引值
            oldIndexValues, err = indexFunc(oldObj)
        } else {
            oldIndexValues = oldIndexValues[:0]
        }
        if err != nil {
            panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
        }

        if newObj != nil {
            // 計算新的索引值
            indexValues, err = indexFunc(newObj)
        } else {
            indexValues = indexValues[:0]
        }
        if err != nil {
            panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
        }
        // 通過索引名找到索引,為空則創建
        index := i.indices[name]
        if index == nil {
            index = Index{}
            i.indices[name] = index
        }
        // 一個小優化,當添加的新值和舊值都為1並且相同時,無需處理
        if len(indexValues) == 1 && len(oldIndexValues) == 1 && indexValues[0] == oldIndexValues[0] {
            // We optimize for the most common case where indexFunc returns a single value which has not been changed
            continue
        }
        // 從索引中刪除key
        for _, value := range oldIndexValues {
            i.deleteKeyFromIndex(key, value, index)
        }
        // 從索引中添加key
        for _, value := range indexValues {
            i.addKeyToIndex(key, value, index)
        }
    }
}

func (i *storeIndex) addKeyToIndex(key, indexValue string, index Index) {
    set := index[indexValue]
    if set == nil {
        set = sets.String{}
        index[indexValue] = set
    }
    // 索引名+索引值可能對應多個緩存key,通過set去重
    set.Insert(key)
}

func (i *storeIndex) deleteKeyFromIndex(key, indexValue string, index Index) {
    set := index[indexValue]
    if set == nil {
        return
    }
    // 刪除緩存key
    set.Delete(key)
    // 當緩存key的set為空時,刪除索引值的映射,避免oom
    if len(set) == 0 {
        delete(index, indexValue)
    }
}

4.總結

kubernetes indexer在實現多索引的本地緩存的思路,特別在代碼可複用方面,值得我們學習借鑑

Add a new 评论

Some HTML is okay.