こんにちは,y1rです. 先日,shallowverseでISUCON11本選に出場しました. 他のチームのブログを読んだところ, x/sync/singleflightを使って同様のリクエストを1つにまとめる方法が紹介されていました. この方法を知らなかったので,まとめておきます.
singleflightは名前の通り,関数コールから返ってくるまでのもの(in-flightなもの)を1つにまとめてくれるものです. 以下の重い関数を考えてみます:
// 起動時に初期化される
var startTime time.Time
func slowFunction(d time.Duration) string {
calledAt := time.Now().Sub(startTime)
time.Sleep(d)
return fmt.Sprintf("%v", calledAt)
}
この関数は,引数で渡された時間だけ待って,関数が呼ばれた相対時刻を返します.
単純にこの関数を何度も呼ぶと,1回の呼び出しごとにtime.Sleep
するので時間がかかります.
いくつかのrequest groupが団子状に呼ばれる状況を考えてみます:
wg := sync.WaitGroup{}
sfGroup := singleflight.Group{}
for gid := 0; gid < 100; gid++ {
dupName := "group"
// [1:3]
requests := rand.Intn(3) + 1
fmt.Printf("groupName: %s, requests: %d\n", dupName, requests)
for requestId := 0; requestId < requests; requestId++ {
wg.Add(1)
go func() {
v, err, shared := sfGroup.Do(dupName, func() (interface{}, error) {
return slowFunction(1 * time.Second), nil
})
fmt.Printf("called %v\treturned %s\terror %v\tshared %v\n", time.Now().Sub(startTime), v.(string), err, shared)
wg.Done()
}()
}
wg.Wait()
fmt.Printf("\n")
}
ここで,sync.WaitGroup
によって異なるgidを持つrequest groupは,同時にslowFunction
を呼び出さないように制御されています.
このプログラムを実行すると,
groupName: group, requests: 3
called 1.000624867s returned 34.515µs error <nil> shared true
called 1.000654102s returned 34.515µs error <nil> shared true
called 1.000634555s returned 34.515µs error <nil> shared true
groupName: group, requests: 1
called 2.001261114s returned 1.000733631s error <nil> shared false
groupName: group, requests: 3
called 3.001915722s returned 2.001357044s error <nil> shared true
called 3.001958753s returned 2.001357044s error <nil> shared true
called 3.001979612s returned 2.001357044s error <nil> shared true
groupName: group, requests: 3
called 4.00240144s returned 3.002055625s error <nil> shared true
called 4.002421698s returned 3.002055625s error <nil> shared true
called 4.002429934s returned 3.002055625s error <nil> shared true
groupName: group, requests: 2
called 5.002686656s returned 4.002470009s error <nil> shared true
called 5.00270983s returned 4.002470009s error <nil> shared true
groupName: group, requests: 1
called 6.002864988s returned 5.002751398s error <nil> shared false
groupName: group, requests: 2
called 7.003451549s returned 6.002894483s error <nil> shared true
called 7.003472117s returned 6.002894483s error <nil> shared true
同じgidを持つrequest group内においては,同じ相対時刻(returned
)が返っていることから,複数回のslowFunctionの呼び出しが抑制できていることが分かります.
また,request group間の相対時刻(called
)は1秒程度の差になっており,request groupは1秒間隔で処理できていることが分かります.
request groupが存在せず,ある速度でrequestが呼び出され続ける場合を考えてみます:
wg := sync.WaitGroup{}
sfGroup := singleflight.Group{}
dupName := "group"
for requestId := 0; requestId < 100; requestId++ {
wg.Add(1)
go func() {
v, err, shared := sfGroup.Do(dupName, func() (interface{}, error) {
return slowFunction(1 * time.Second), nil
})
fmt.Printf("called %v\treturned %s\terror %v\tshared %v\n", time.Now().Sub(startTime), v.(string), err, shared)
wg.Done()
}()
time.Sleep(100 * time.Millisecond)
}
wg.Wait()
ここでは呼び出しごとに100msの遅延時間を入れており,slowFunction
は実行に1秒かかるので,9~10個のslowFunction
を1つの呼び出しにまとめられそうです.
試してみると,
called 1.000606863s returned 17.643µs error <nil> shared true
called 1.000630588s returned 17.643µs error <nil> shared true
called 1.000628344s returned 17.643µs error <nil> shared true
called 1.000640427s returned 17.643µs error <nil> shared true
called 1.000653331s returned 17.643µs error <nil> shared true
called 1.000653581s returned 17.643µs error <nil> shared true
called 1.000657699s returned 17.643µs error <nil> shared true
called 1.000667367s returned 17.643µs error <nil> shared true
called 1.000671465s returned 17.643µs error <nil> shared true
called 1.000675923s returned 17.643µs error <nil> shared true
called 2.005459939s returned 1.004864502s error <nil> shared true
called 2.005478514s returned 1.004864502s error <nil> shared true
called 2.005486369s returned 1.004864502s error <nil> shared true
called 2.005489915s returned 1.004864502s error <nil> shared true
called 2.005496157s returned 1.004864502s error <nil> shared true
called 2.005502389s returned 1.004864502s error <nil> shared true
called 2.00550831s returned 1.004864502s error <nil> shared true
called 2.00551392s returned 1.004864502s error <nil> shared true
called 2.00552416s returned 1.004864502s error <nil> shared true
called 2.005537204s returned 1.004864502s error <nil> shared true
called 3.011355152s returned 2.011075391s error <nil> shared true
called 3.011377393s returned 2.011075391s error <nil> shared true
called 3.011390538s returned 2.011075391s error <nil> shared true
called 3.011398142s returned 2.011075391s error <nil> shared true
called 3.011404224s returned 2.011075391s error <nil> shared true
called 3.011413391s returned 2.011075391s error <nil> shared true
called 3.011419452s returned 2.011075391s error <nil> shared true
called 3.011430133s returned 2.011075391s error <nil> shared true
called 3.011444149s returned 2.011075391s error <nil> shared true
called 3.011451062s returned 2.011075391s error <nil> shared true
called 4.017036254s returned 3.016859766s error <nil> shared true
called 4.017055179s returned 3.016859766s error <nil> shared true
called 4.017063505s returned 3.016859766s error <nil> shared true
called 4.017069717s returned 3.016859766s error <nil> shared true
called 4.017075848s returned 3.016859766s error <nil> shared true
called 4.017081609s returned 3.016859766s error <nil> shared true
called 4.017053236s returned 3.016859766s error <nil> shared true
called 4.017088552s returned 3.016859766s error <nil> shared true
called 4.017121063s returned 3.016859766s error <nil> shared true
called 4.017093201s returned 3.016859766s error <nil> shared true
となり,予想通りまとめられていることが分かります.
最後に,singleflight.Group.Forget
を使って,5個ごとに結果を使い回さないようにしてみます:
wg := sync.WaitGroup{}
sfGroup := singleflight.Group{}
dupName := "group"
for requestId := 0; requestId < 100; requestId++ {
wg.Add(1)
if requestId % 5 == 0 {
sfGroup.Forget(dupName)
}
go func() {
v, err, shared := sfGroup.Do(dupName, func() (interface{}, error) {
return slowFunction(1 * time.Second), nil
})
fmt.Printf("called %v\treturned %s\terror %v\tshared %v\n", time.Now().Sub(startTime), v.(string), err, shared)
wg.Done()
}()
time.Sleep(100 * time.Millisecond)
}
wg.Wait()
これを実行すると以下を得ます.
called 1.000769355s returned 22.833µs error <nil> shared true
called 1.000786557s returned 22.833µs error <nil> shared true
called 1.000795694s returned 22.833µs error <nil> shared true
called 1.000802477s returned 22.833µs error <nil> shared true
called 1.000808128s returned 22.833µs error <nil> shared true
called 1.501880251s returned 501.120251ms error <nil> shared true
called 1.501889088s returned 501.120251ms error <nil> shared true
called 1.501925977s returned 501.120251ms error <nil> shared true
called 1.501917571s returned 501.120251ms error <nil> shared true
called 1.501933s returned 501.120251ms error <nil> shared true
called 2.005433391s returned 1.003911634s error <nil> shared true
called 2.00545396s returned 1.003911634s error <nil> shared true
called 2.005462005s returned 1.003911634s error <nil> shared true
called 2.005468076s returned 1.003911634s error <nil> shared true
called 2.005473897s returned 1.003911634s error <nil> shared true
called 2.505892995s returned 1.505474451s error <nil> shared true
called 2.505914877s returned 1.505474451s error <nil> shared true
called 2.505928252s returned 1.505474451s error <nil> shared true
called 2.505935866s returned 1.505474451s error <nil> shared true
called 2.505946476s returned 1.505474451s error <nil> shared true
このときは,相対時刻(returned
)が500ms刻みになっているので,singleflightではなく,doubleflightになっていることが分かります.
実装も読んでみましたが結構シンプルです(link).
singleflight groupはmap[gid]call
を持っています.
初回の呼び出しは,mapにcall object (のちに戻り値が入る)を入れたあとに実際に計算を始めます.
それ以外のduplicatedな呼び出しは,mapにcall objectが入っていれば,その参照を得たあとにsync.WaitGroup
で計算完了を待ちます.
初回の呼び出しは,計算を終えると直ちにmapからcall objectを消してしまうので,
計算中に発生したduplicatedな呼び出ししか,call objectを参照できず,遅れてやってきたもの(本記事では次のrequest groupに対応する)は,
もう一度計算を始めることになります.