future supports multi-await but run once

This commit is contained in:
Li Jie
2024-09-09 09:34:29 +08:00
parent ccc7d056ba
commit 44617b6554
4 changed files with 156 additions and 19 deletions

View File

@@ -1,6 +1,6 @@
讨论: 讨论:
1. Future 用 interface 还是闭包:性能应该差不多,如果没有其他方法要暴露,感觉也没有换成 interface 的必要。 1. Future 用 interface 还是闭包:性能应该差不多,如果没有其他方法要暴露,感觉也没有换成 interface 的必要interface 多一个对象分配。先添加 Then 方法方便未来替换
2. 几个方法提供不同参数个数的版本还是用 tuple如果编译器不支持可变泛型参数个数和特化我倾向用 tuple 先简化实现tuple 的开销应该也容易被编译器优化掉。多个方法让用户选择 Await2/Await3 这种也恶心。 2. 几个方法提供不同参数个数的版本还是用 tuple如果编译器不支持可变泛型参数个数和特化我倾向用 tuple 先简化实现tuple 的开销应该也容易被编译器优化掉。多个方法让用户选择 Await2/Await3 这种也恶心。
3. 是否 Cancellable暂时不加进去多一个 context也不一定能快速稳定下来可以后面根据实践再改。 3. 是否 Cancellable暂时不加进去多一个 context也不一定能快速稳定下来可以后面根据实践再改。
4. Executor 可能会变化,目前提供的 Run 是阻塞的,也可以把它做成异步。 4. Executor 可能会变化,目前提供的 Run 是阻塞的,也可以把它做成异步。
@@ -12,5 +12,6 @@
TODO TODO
10. select 兼容 (可能把 Future 改为 interface 更合理?) [ ] 1. select 兼容 (可能把 Future 改为 interface 更合理?)
11. Future 只会被执行一次 [x] 2. Future 多个 Await 只会被执行一次
[x] 3. Future 添加 Then 方法,不推荐直接当作函数调用,方便未来切换

View File

@@ -3,6 +3,7 @@ package main
import ( import (
"fmt" "fmt"
"os" "os"
"sync/atomic"
"time" "time"
"github.com/goplus/llgo/x/async" "github.com/goplus/llgo/x/async"
@@ -42,6 +43,8 @@ func main() {
RunIO() RunIO()
RunAllAndRace() RunAllAndRace()
RunTimeout() RunTimeout()
RunMultipleCallbacksNodelay()
RunMultipleCallbacksDelay()
RunSocket() RunSocket()
} }
@@ -159,6 +162,76 @@ func RunTimeout() {
})) }))
} }
func RunMultipleCallbacksNodelay() {
println("Run Multiple Callbacks")
runCnt := atomic.Int32{}
async.Run(async.Async(func(resolve func(async.Void)) {
nodelay := async.Async(func(resolve func(async.Void)) {
println("nodelay")
runCnt.Add(1)
resolve(async.Void{})
})
cbCnt := atomic.Int32{}
cb := func() {
if cbCnt.Add(1) == 2 {
resolve(async.Void{})
}
}
nodelay.Then(func(async.Void) {
println("nodelay done")
cb()
})
nodelay.Then(func(async.Void) {
println("nodelay done again")
cb()
})
}))
if runCnt.Load() != 1 {
panic("runCnt != 1")
}
}
func RunMultipleCallbacksDelay() {
println("Run Multiple Callbacks")
runCnt := atomic.Int32{}
async.Run(async.Async(func(resolve func(async.Void)) {
delay := async.Async(func(resolve func(async.Void)) {
timeout.Timeout(100 * time.Millisecond).Then(func(async.Void) {
println("delay")
runCnt.Add(1)
resolve(async.Void{})
})
})
cbCnt := atomic.Int32{}
cb := func() {
if cbCnt.Add(1) == 2 {
resolve(async.Void{})
}
}
delay.Then(func(async.Void) {
println("delay done")
cb()
})
delay.Then(func(async.Void) {
println("delay done again")
cb()
})
}))
if runCnt.Load() != 1 {
panic("runCnt != 1")
}
}
func RunSocket() { func RunSocket() {
println("Run Socket") println("Run Socket")

View File

@@ -22,8 +22,25 @@ package async
import "sync" import "sync"
func Async[T any](fn func(func(T))) Future[T] { func Async[T any](fn func(func(T))) Future[T] {
var once sync.Once
var result T
var wg sync.WaitGroup
wg.Add(1)
return func(chain func(T)) { return func(chain func(T)) {
go fn(chain) once.Do(func() {
go func() {
fn(func(v T) {
result = v
wg.Done()
})
}()
})
go func() {
wg.Wait()
chain(result)
}()
} }
} }

View File

@@ -20,6 +20,7 @@
package async package async
import ( import (
"sync"
"sync/atomic" "sync/atomic"
"github.com/goplus/llgo/c/libuv" "github.com/goplus/llgo/c/libuv"
@@ -27,23 +28,68 @@ import (
) )
// Currently Async run chain a future that call chain in the goroutine running `async.Run`. // Currently Async run chain a future that call chain in the goroutine running `async.Run`.
// TODO(lijie): It would better to switch when needed. // Basic implementation:
func Async[T any](fn func(func(T))) Future[T] { // func Async[T any](fn func(func(T))) Future[T] {
return func(chain func(T)) { // return func(chain func(T)) {
loop := Exec().L // loop := Exec().L
var result T // var result T
var a *libuv.Async // var a *libuv.Async
var cb libuv.AsyncCb // var cb libuv.AsyncCb
a, cb = cbind.BindF[libuv.Async, libuv.AsyncCb](func(a *libuv.Async) { // a, cb = cbind.BindF[libuv.Async, libuv.AsyncCb](func(a *libuv.Async) {
a.Close(nil) // a.Close(nil)
// chain(result)
// })
// loop.Async(a, cb)
// fn(func(v T) {
// result = v
// a.Send()
// })
// }
// }
//
// Better implementation to support multiple callbacks:
func Async[T any](fn func(func(T))) Future[T] {
var result T
var done atomic.Bool
var resultReady atomic.Bool
var callbacks []func(T)
var mutex sync.Mutex
return func(chain func(T)) {
mutex.Lock()
if resultReady.Load() {
mutex.Unlock()
chain(result) chain(result)
}) return
loop.Async(a, cb) }
fn(func(v T) { callbacks = append(callbacks, chain)
result = v if !done.Swap(true) {
a.Send() mutex.Unlock()
}) loop := Exec().L
var a *libuv.Async
var cb libuv.AsyncCb
a, cb = cbind.BindF[libuv.Async, libuv.AsyncCb](func(a *libuv.Async) {
a.Close(nil)
mutex.Lock()
resultReady.Store(true)
currentCallbacks := callbacks
callbacks = nil
mutex.Unlock()
for _, callback := range currentCallbacks {
callback(result)
}
})
loop.Async(a, cb)
fn(func(v T) {
result = v
a.Send()
})
} else {
mutex.Unlock()
}
} }
} }