From 44617b6554ebb9a1c2bfa1bd4c7793fd3a725671 Mon Sep 17 00:00:00 2001 From: Li Jie Date: Mon, 9 Sep 2024 09:34:29 +0800 Subject: [PATCH] future supports multi-await but run once --- x/async/TODO.md | 7 ++-- x/async/_demo/all/all.go | 73 ++++++++++++++++++++++++++++++++++++++ x/async/async_go.go | 19 +++++++++- x/async/async_llgo.go | 76 ++++++++++++++++++++++++++++++++-------- 4 files changed, 156 insertions(+), 19 deletions(-) diff --git a/x/async/TODO.md b/x/async/TODO.md index 78a2373a..6da0db45 100644 --- a/x/async/TODO.md +++ b/x/async/TODO.md @@ -1,6 +1,6 @@ 讨论: -1. Future 用 interface 还是闭包:性能应该差不多,如果没有其他方法要暴露,感觉也没有换成 interface 的必要。 +1. Future 用 interface 还是闭包:性能应该差不多,如果没有其他方法要暴露,感觉也没有换成 interface 的必要,interface 多一个对象分配。先添加 Then 方法方便未来替换。 2. 几个方法提供不同参数个数的版本还是用 tuple:如果编译器不支持可变泛型参数个数和特化,我倾向用 tuple 先简化实现,tuple 的开销应该也容易被编译器优化掉。多个方法让用户选择 Await2/Await3 这种也恶心。 3. 是否 Cancellable,暂时不加进去,多一个 context,也不一定能快速稳定下来,可以后面根据实践再改。 4. Executor 可能会变化,目前提供的 Run 是阻塞的,也可以把它做成异步。 @@ -12,5 +12,6 @@ TODO: -10. select 兼容 (可能把 Future 改为 interface 更合理?) -11. Future 只会被执行一次 +[ ] 1. select 兼容 (可能把 Future 改为 interface 更合理?) +[x] 2. Future 多个 Await 只会被执行一次 +[x] 3. Future 添加 Then 方法,不推荐直接当作函数调用,方便未来切换 diff --git a/x/async/_demo/all/all.go b/x/async/_demo/all/all.go index 6c38e49c..80ce489f 100644 --- a/x/async/_demo/all/all.go +++ b/x/async/_demo/all/all.go @@ -3,6 +3,7 @@ package main import ( "fmt" "os" + "sync/atomic" "time" "github.com/goplus/llgo/x/async" @@ -42,6 +43,8 @@ func main() { RunIO() RunAllAndRace() RunTimeout() + RunMultipleCallbacksNodelay() + RunMultipleCallbacksDelay() RunSocket() } @@ -159,6 +162,76 @@ func RunTimeout() { })) } +func RunMultipleCallbacksNodelay() { + println("Run Multiple Callbacks") + + runCnt := atomic.Int32{} + async.Run(async.Async(func(resolve func(async.Void)) { + nodelay := async.Async(func(resolve func(async.Void)) { + println("nodelay") + runCnt.Add(1) + resolve(async.Void{}) + }) + + cbCnt := atomic.Int32{} + cb := func() { + if cbCnt.Add(1) == 2 { + resolve(async.Void{}) + } + } + + nodelay.Then(func(async.Void) { + println("nodelay done") + cb() + }) + + nodelay.Then(func(async.Void) { + println("nodelay done again") + cb() + }) + })) + + if runCnt.Load() != 1 { + panic("runCnt != 1") + } +} + +func RunMultipleCallbacksDelay() { + println("Run Multiple Callbacks") + + runCnt := atomic.Int32{} + async.Run(async.Async(func(resolve func(async.Void)) { + delay := async.Async(func(resolve func(async.Void)) { + timeout.Timeout(100 * time.Millisecond).Then(func(async.Void) { + println("delay") + runCnt.Add(1) + resolve(async.Void{}) + }) + }) + + cbCnt := atomic.Int32{} + cb := func() { + if cbCnt.Add(1) == 2 { + resolve(async.Void{}) + } + } + + delay.Then(func(async.Void) { + println("delay done") + cb() + }) + + delay.Then(func(async.Void) { + println("delay done again") + cb() + }) + })) + + if runCnt.Load() != 1 { + panic("runCnt != 1") + } +} + func RunSocket() { println("Run Socket") diff --git a/x/async/async_go.go b/x/async/async_go.go index 47f691c3..42addf15 100644 --- a/x/async/async_go.go +++ b/x/async/async_go.go @@ -22,8 +22,25 @@ package async import "sync" func Async[T any](fn func(func(T))) Future[T] { + var once sync.Once + var result T + var wg sync.WaitGroup + wg.Add(1) + return func(chain func(T)) { - go fn(chain) + once.Do(func() { + go func() { + fn(func(v T) { + result = v + wg.Done() + }) + }() + }) + + go func() { + wg.Wait() + chain(result) + }() } } diff --git a/x/async/async_llgo.go b/x/async/async_llgo.go index 9a7c7285..8f297db5 100644 --- a/x/async/async_llgo.go +++ b/x/async/async_llgo.go @@ -20,6 +20,7 @@ package async import ( + "sync" "sync/atomic" "github.com/goplus/llgo/c/libuv" @@ -27,23 +28,68 @@ import ( ) // Currently Async run chain a future that call chain in the goroutine running `async.Run`. -// TODO(lijie): It would better to switch when needed. -func Async[T any](fn func(func(T))) Future[T] { - return func(chain func(T)) { - loop := Exec().L +// Basic implementation: +// func Async[T any](fn func(func(T))) Future[T] { +// return func(chain func(T)) { +// loop := Exec().L - var result T - var a *libuv.Async - var cb libuv.AsyncCb - a, cb = cbind.BindF[libuv.Async, libuv.AsyncCb](func(a *libuv.Async) { - a.Close(nil) +// var result T +// var a *libuv.Async +// var cb libuv.AsyncCb +// a, cb = cbind.BindF[libuv.Async, libuv.AsyncCb](func(a *libuv.Async) { +// a.Close(nil) +// chain(result) +// }) +// loop.Async(a, cb) +// fn(func(v T) { +// result = v +// a.Send() +// }) +// } +// } +// +// Better implementation to support multiple callbacks: +func Async[T any](fn func(func(T))) Future[T] { + var result T + var done atomic.Bool + var resultReady atomic.Bool + var callbacks []func(T) + var mutex sync.Mutex + + return func(chain func(T)) { + mutex.Lock() + if resultReady.Load() { + mutex.Unlock() chain(result) - }) - loop.Async(a, cb) - fn(func(v T) { - result = v - a.Send() - }) + return + } + callbacks = append(callbacks, chain) + if !done.Swap(true) { + mutex.Unlock() + loop := Exec().L + + var a *libuv.Async + var cb libuv.AsyncCb + a, cb = cbind.BindF[libuv.Async, libuv.AsyncCb](func(a *libuv.Async) { + a.Close(nil) + mutex.Lock() + resultReady.Store(true) + currentCallbacks := callbacks + callbacks = nil + mutex.Unlock() + + for _, callback := range currentCallbacks { + callback(result) + } + }) + loop.Async(a, cb) + fn(func(v T) { + result = v + a.Send() + }) + } else { + mutex.Unlock() + } } }