Groupcache(一):入门

本文是研究Groupcache的第一篇,简单跑个Groupcache的例子,对一些代码进行简要分析。

示例 & 运行

示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package main
import (
"fmt"
"io/ioutil"
"log"
"net/http"
"github.com/golang/groupcache"
)
func generateThumbnail(fileName string) []byte {
result, err := ioutil.ReadFile(fileName)
if err != nil {
fmt.Println(err.Error())
return nil
}
return result
}
func main() {
// 声明自己和自己的peers
me := "http://127.0.0.1:8080"
peers := groupcache.NewHTTPPool(me)
peers.Set("http://127.0.0.1:8081", "http://127.0.0.1:8082", "http://127.0.0.1:8083")
// 创建Group实例
var thumbNails = groupcache.NewGroup("thumbnails", 64<<20, groupcache.GetterFunc(
func(ctx groupcache.Context, key string, dest groupcache.Sink) error {
fileName := key
dest.SetBytes(generateThumbnail(fileName))
return nil
}))
// 路由
http.HandleFunc("/thumbnails/", func(rw http.ResponseWriter, r *http.Request) {
var data []byte
thumbNails.Get(nil, r.URL.Path[len("/thumbnails/"):], groupcache.AllocatingByteSliceSink(&data))
rw.Write([]byte(data))
})
// 启动服务器
log.Fatal(http.ListenAndServe(me[len("http://"):], nil))
}

这段代码补全自:https://talks.golang.org/2013/oscon-dl.slide#45

运行步骤:

  1. 本人先在工程目录下放上一张帅气的自拍,文件名为big-file.jpg,运行程序后服务器启动,在浏览器输入 http://localhost:8080/thumbnails/big-file.jpg 后,帅气光芒迸发;
  2. 将帅气自拍从本地删除,另起一个server,该server监控8081端口,在浏览器输入 http://localhost:8081/thumbnails/big-file.jpg ,虽然本地并不存在big-file.jpg,但是由于自拍已然缓存,所以帅气光芒依旧迸发;

代码简单分析

将目光聚焦到路由函数HandleFunc()中,可以看到关键的处理函数是Get(),通过它,data最终将获得被写入的值(缓存和本地都不能得到值将打印错误)。

走进Get()内部:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (g *Group) Get(ctx Context, key string, dest Sink) error {
g.peersOnce.Do(g.initPeers)
g.Stats.Gets.Add(1)
if dest == nil {
return errors.New("groupcache: nil dest Sink")
}
value, cacheHit := g.lookupCache(key)
if cacheHit {
g.Stats.CacheHits.Add(1)
return setSinkView(dest, value)
}
// Optimization to avoid double unmarshalling or copying: keep
// track of whether the dest was already populated. One caller
// (if local) will set this; the losers will not. The common
// case will likely be one caller.
destPopulated := false
value, destPopulated, err := g.load(ctx, key, dest)
if err != nil {
return err
}
if destPopulated {
return nil
}
return setSinkView(dest, value)
}

ctx是一个opaque值,通常可以设为nil;dest是一个Sink,Sink作为一个接口集合了一些数据处理的方法。在这里代码传入了一个groupcache.AllocatingByteSliceSink(&data)方法,该方法返回的是一个实现了Sink接口方法的结构体实例,结构体长这样:

1
2
3
4
type allocBytesSink struct {
dst *[]byte
v ByteView
}

所以dest就是该结构体实例,实例的dst字段指向传入的data,Get函数会调用g.lookupCache(key),调用成功说明在缓存里拿到了值,用setSinkView()设置之并返回。

如果上一步没有成功,就要采用g.load(ctx, key, dest)来获取需要设置的value了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
// load loads key either by invoking the getter locally or by sending it to another machine.
func (g *Group) load(ctx Context, key string, dest Sink) (value ByteView, destPopulated bool, err error) {
g.Stats.Loads.Add(1)
viewi, err := g.loadGroup.Do(key, func() (interface{}, error) {
// Check the cache again because singleflight can only dedup calls
// that overlap concurrently. It's possible for 2 concurrent
// requests to miss the cache, resulting in 2 load() calls. An
// unfortunate goroutine scheduling would result in this callback
// being run twice, serially. If we don't check the cache again,
// cache.nbytes would be incremented below even though there will
// be only one entry for this key.
//
// Consider the following serialized event ordering for two
// goroutines in which this callback gets called twice for hte
// same key:
// 1: Get("key")
// 2: Get("key")
// 1: lookupCache("key")
// 2: lookupCache("key")
// 1: load("key")
// 2: load("key")
// 1: loadGroup.Do("key", fn)
// 1: fn()
// 2: loadGroup.Do("key", fn)
// 2: fn()
if value, cacheHit := g.lookupCache(key); cacheHit {
g.Stats.CacheHits.Add(1)
return value, nil
}
g.Stats.LoadsDeduped.Add(1)
var value ByteView
var err error
if peer, ok := g.peers.PickPeer(key); ok {
value, err = g.getFromPeer(ctx, peer, key)
if err == nil {
g.Stats.PeerLoads.Add(1)
return value, nil
}
g.Stats.PeerErrors.Add(1)
// TODO(bradfitz): log the peer's error? keep
// log of the past few for /groupcachez? It's
// probably boring (normal task movement), so not
// worth logging I imagine.
}
value, err = g.getLocally(ctx, key, dest)
if err != nil {
g.Stats.LocalLoadErrs.Add(1)
return nil, err
}
g.Stats.LocalLoads.Add(1)
destPopulated = true // only one caller of load gets this return value
g.populateCache(key, value, &g.mainCache)
return value, nil
})
if err == nil {
value = viewi.(ByteView)
}
return
}

这里先忽略一些细节,可以看出方法会依次尝试g.getFromPeer(ctx, peer, key)和g.getLocally(ctx, key, dest)来获取值。为了找到和演示代码中创建Group实例时传入的groupcache.GetterFunc参数产生的关联性,这里主要查看一下g.getLocally(ctx, key, dest)的代码:

1
2
3
4
5
6
7
func (g *Group) getLocally(ctx Context, key string, dest Sink) (ByteView, error) {
err := g.getter.Get(ctx, key, dest)
if err != nil {
return ByteView{}, err
}
return dest.view()
}

代码中g.getter就是参数groupcache.GetterFunc,它定义了如何获取本地数据,可以从磁盘加载,也可以从数据库得到。

小结

本篇运行了一个Groupcache例子,熟悉这个分布式KV缓存系统的操作,通过分析相关代码了解了数据的查找流程。

参考

oscon-dl.slide#46