Merge pull request #781 from cpunion/future-io

Future IO update
This commit is contained in:
xushiwei
2024-09-12 12:48:25 +08:00
committed by GitHub
9 changed files with 290 additions and 131 deletions

View File

@@ -361,29 +361,83 @@ In some situations, you may want to get the first result of multiple async opera
## Design considerations in LLGo
- Don't introduce `async`/`await` keywords to compatible with Go compiler (just compiling)
- For performance reason don't implement async functions with goroutines
- Avoid implementing `Promise` by using `chan` to avoid blocking the thread, but it can be wrapped as a `chan` to make it compatible `select` statement
- Don't introduce `async`/`await` keywords to compatible with Go
- For performance and memory reasons don't implement async functions with goroutines, coroutines, or other mechanisms that require per-task stack allocation
- Avoid implementing async task by using `chan` that blocking the thread
## Design
Introduce `async.IO[T]` type to represent an asynchronous operation, `async.Future[T]` type to represent the result of an asynchronous operation. `async.IO[T]` can be `bind` to a function that accepts `T` as an argument to chain multiple asynchronous operations. `async.IO[T]` can be `await` to get the value of the asynchronous operation.
### `async.Future[T]` type
Introduce `async.Future[T]` type to represent an eventual completion (or failure) of an asynchronous operation and its resulting value, similar to `Promise`/`Future` in other languages. Functions that return `async.Future[T]` are considered asynchronous functions.
### Future creation
`async.Future[T]` can be created by `async.Async[T]` function that takes a function that accepts a `resolve` function to produce a value of type `T`.
### Future chaining (asynchronous callbacks style)
`async.Future[T]` can be chained with `Then` method to add multiple callbacks to be executed when the operation is completed, it just runs once and calls every callbacks. Currently `Then` method can't be chained multiple times because Go doesn't support generics method (Need support `func (f Future[T]) Then[U any](f func(T) Future[U]) Future[U]`), maybe implements in Go+.
### Future waiting (synchronous style)
`async.Await[T]` function can be used to wait for the completion of a `Future[T]` and return the value produced by the operation. In LLGo, `async.Await[T]` is a blocking function that waits for the completion of the `Future[T]` and returns the value synchronously, it would be transformed to `Future.Then` callback in the frontend.
### `async.Run[T]` function
`async.Run[T]` function can be used to create an global asynchronous context and run async functions, and it would be hidden by the compiler in the future.
Currently it will switch the callbacks to the goroutine that calls `async.Run[T]` function, this maybe changed in the future to reduce the overhead of switching goroutines and make it more parallel.
### Prototype
```go
package async
type Future[T any] func() T
type IO[T any] func() Future[T]
type Future[T any] interface {
Then(f func(T))
}
func Async[T any](f func(resolve func(T))) Future[T]
func Await[T any](future Future[T]) T
```
### Some async functions
```go
package async
func Race[T1 any](futures ...Future[T1]) Future[T1]
func All[T1 any](futures ...Future[T1]) Future[[]T1]
```
### Example
```go
package main
func main() {
io := func() Future[string] {
return func() string {
return "Hello, World!"
}
async.Run(func() {
hello := func() async.Future[string] {
return async.Async(func(resolve func(string)) {
resolve("Hello, World!")
})
}
future := io()
value := future()
println(value)
future := hello()
future.Then(func(value string) {
println("first callback:", value)
})
future.Then(func(value string) {
println("second callback:", value)
})
println("first await:", async.Await(future))
println("second await:", async.Await(future))
})
}
```

17
x/async/TODO.md Normal file
View File

@@ -0,0 +1,17 @@
讨论:
1. Future 用 interface 还是闭包:性能应该差不多,如果没有其他方法要暴露,感觉也没有换成 interface 的必要interface 多一个对象分配。先添加 Then 方法方便未来替换。
2. 几个方法提供不同参数个数的版本还是用 tuple如果编译器不支持可变泛型参数个数和特化我倾向用 tuple 先简化实现tuple 的开销应该也容易被编译器优化掉。多个方法让用户选择 Await2/Await3 这种也恶心。
3. 是否 Cancellable暂时不加进去多一个 context也不一定能快速稳定下来可以后面根据实践再改。
4. Executor 可能会变化,目前提供的 Run 是阻塞的,也可以把它做成异步。
5. 尽量再隐藏一些辅助类型,比如 TupleN可能之提供 tuple 的构造和返回多值。内部的 libuv 如果隐藏可能要暴露同等接口,先不动了
6. 性能可能做个简单测试,但不是关键,只要别太差。未来可能会尽量减少 executor 的切换、尽量多并行
7. 异常兼容性:目前没考虑,这个要在回调里处理可能困难,要么就在 await 上处理,可以往后放一下,毕竟 golang 主要是以 error 为主
8. 可能先看一下如何在 go+里面集成,判断目前的设计实现是否合理
9. 多封装一些库看看通用性和易用性,\_demo 里几个简单例子基本符合预期,还需要更多检验
TODO
[ ] 1. select 兼容 (可能把 Future 改为 interface 更合理?)
[x] 2. Future 多个 Await 只会被执行一次
[x] 3. Future 添加 Then 方法,不推荐直接当作函数调用,方便未来切换

View File

@@ -3,6 +3,7 @@ package main
import (
"fmt"
"os"
"sync/atomic"
"time"
"github.com/goplus/llgo/x/async"
@@ -32,26 +33,29 @@ func WriteFile(fileName string, content []byte) async.Future[error] {
func sleep(i int, d time.Duration) async.Future[int] {
return async.Async(func(resolve func(int)) {
timeout.Timeout(d)(func(async.Void) {
timeout.Timeout(d).Then(func(async.Void) {
resolve(i)
})
})
}
func main() {
async.Run(func(resolve func(async.Void)) {
RunIO()
RunAllAndRace()
RunTimeout()
RunMultipleCallbacksNodelay()
RunMultipleCallbacksDelay()
RunSocket()
})
}
func RunIO() {
println("RunIO with Await")
// Hide `resolve` in Go+
async.Run(async.Async(func(resolve func(async.Void)) {
println("read file")
defer resolve(async.Void{})
content, err := async.Await(ReadFile("all.go")).Get()
if err != nil {
fmt.Printf("read err: %v\n", err)
@@ -64,32 +68,26 @@ func RunIO() {
return
}
fmt.Printf("write done\n")
}))
// Translated Await to BindIO in Go+:
println("RunIO with BindIO")
async.Run(async.Async(func(resolve func(async.Void)) {
ReadFile("all.go")(func(v tuple.Tuple2[[]byte, error]) {
ReadFile("all.go").Then(func(v tuple.Tuple2[[]byte, error]) {
content, err := v.Get()
if err != nil {
fmt.Printf("read err: %v\n", err)
resolve(async.Void{})
return
}
fmt.Printf("read content: %s\n", content)
WriteFile("2.out", content)(func(v error) {
WriteFile("2.out", content).Then(func(v error) {
err = v
if err != nil {
fmt.Printf("write err: %v\n", err)
resolve(async.Void{})
return
}
println("write done")
resolve(async.Void{})
})
})
}))
}
func RunAllAndRace() {
@@ -99,92 +97,139 @@ func RunAllAndRace() {
println("Run All with Await")
async.Run(async.Async(func(resolve func(async.Void)) {
async.All(sleep(1, ms200), sleep(2, ms100), sleep(3, ms300))(func(v []int) {
async.All(sleep(1, ms200), sleep(2, ms100), sleep(3, ms300)).Then(func(v []int) {
fmt.Printf("All: %v\n", v)
resolve(async.Void{})
})
}))
println("Run Race with Await")
async.Run(async.Async(func(resolve func(async.Void)) {
first := async.Race(sleep(1, ms200), sleep(2, ms100), sleep(3, ms300))
v := async.Await(first)
fmt.Printf("Race: %v\n", v)
resolve(async.Void{})
}))
// Translated to in Go+:
println("Run All with BindIO")
async.Run(async.Async(func(resolve func(async.Void)) {
async.All(sleep(1, ms200), sleep(2, ms100), sleep(3, ms300))(func(v []int) {
async.All(sleep(1, ms200), sleep(2, ms100), sleep(3, ms300)).Then(func(v []int) {
fmt.Printf("All: %v\n", v)
resolve(async.Void{})
})
}))
println("Run Race with BindIO")
async.Run(async.Async(func(resolve func(async.Void)) {
async.Race(sleep(1, ms200), sleep(2, ms100), sleep(3, ms300))(func(v int) {
async.Race(sleep(1, ms200), sleep(2, ms100), sleep(3, ms300)).Then(func(v int) {
fmt.Printf("Race: %v\n", v)
resolve(async.Void{})
})
}))
}
func RunTimeout() {
println("Run Timeout with Await")
async.Run(async.Async(func(resolve func(async.Void)) {
fmt.Printf("Start 100 ms timeout\n")
async.Await(timeout.Timeout(100 * time.Millisecond))
fmt.Printf("timeout\n")
resolve(async.Void{})
}))
// Translated to in Go+:
println("Run Timeout with BindIO")
async.Run(async.Async(func(resolve func(async.Void)) {
fmt.Printf("Start 100 ms timeout\n")
timeout.Timeout(100 * time.Millisecond)(func(async.Void) {
timeout.Timeout(100 * time.Millisecond).Then(func(async.Void) {
fmt.Printf("timeout\n")
resolve(async.Void{})
})
}))
}
func RunMultipleCallbacksNodelay() {
println("Run Multiple Callbacks")
runCnt := atomic.Int32{}
nodelay := async.Async(func(resolve func(async.Void)) {
println("nodelay")
runCnt.Add(1)
})
cbCnt := atomic.Int32{}
cb := func() {
if cbCnt.Add(1) == 2 {
if runCnt.Load() != 1 {
panic("runCnt != 1, got: " + fmt.Sprint(runCnt.Load()))
} else {
println("runCnt == 1")
}
}
}
nodelay.Then(func(async.Void) {
println("nodelay done")
cb()
})
nodelay.Then(func(async.Void) {
println("nodelay done again")
cb()
})
}
func RunMultipleCallbacksDelay() {
println("Run Multiple Callbacks")
runCnt := atomic.Int32{}
delay := async.Async(func(resolve func(async.Void)) {
timeout.Timeout(100 * time.Millisecond).Then(func(async.Void) {
println("delay")
runCnt.Add(1)
})
})
cbCnt := atomic.Int32{}
cb := func() {
if cbCnt.Add(1) == 2 {
if runCnt.Load() != 1 {
panic("runCnt != 1, got: " + fmt.Sprint(runCnt.Load()))
} else {
println("runCnt == 1")
}
}
}
delay.Then(func(async.Void) {
println("delay done")
cb()
})
delay.Then(func(async.Void) {
println("delay done again")
cb()
})
}
func RunSocket() {
println("Run Socket")
async.Run(async.Async(func(resolve func(async.Void)) {
println("RunServer")
RunServer()(func(async.Void) {
RunServer().Then(func(async.Void) {
println("RunServer done")
resolve(async.Void{})
})
println("RunClient")
timeout.Timeout(100 * time.Millisecond)(func(async.Void) {
RunClient()(func(async.Void) {
timeout.Timeout(100 * time.Millisecond).Then(func(async.Void) {
RunClient("Bob").Then(func(async.Void) {
println("RunClient done")
})
RunClient("Uncle").Then(func(async.Void) {
println("RunClient done")
resolve(async.Void{})
})
})
}))
}
func RunClient() async.Future[async.Void] {
func RunClient(name string) async.Future[async.Void] {
return async.Async(func(resolve func(async.Void)) {
addr := "127.0.0.1:3927"
socketio.Connect("tcp", addr)(func(v tuple.Tuple2[*socketio.Conn, error]) {
socketio.Connect("tcp", addr).Then(func(v tuple.Tuple2[*socketio.Conn, error]) {
client, err := v.Get()
println("Connected", client, err)
if err != nil {
@@ -194,18 +239,18 @@ func RunClient() async.Future[async.Void] {
var loop func(client *socketio.Conn)
loop = func(client *socketio.Conn) {
counter++
data := fmt.Sprintf("Hello %d", counter)
client.Write([]byte(data))(func(err error) {
data := fmt.Sprintf("Hello from %s %d", name, counter)
client.Write([]byte(data)).Then(func(err error) {
if err != nil {
panic(err)
}
client.Read()(func(v tuple.Tuple2[[]byte, error]) {
client.Read().Then(func(v tuple.Tuple2[[]byte, error]) {
data, err := v.Get()
if err != nil {
panic(err)
}
println("Read from server:", string(data))
timeout.Timeout(1 * time.Second)(func(async.Void) {
timeout.Timeout(1 * time.Second).Then(func(async.Void) {
loop(client)
})
})
@@ -222,13 +267,13 @@ func RunServer() async.Future[async.Void] {
println("Client connected", client, err)
var loop func(client *socketio.Conn)
loop = func(client *socketio.Conn) {
client.Read()(func(v tuple.Tuple2[[]byte, error]) {
client.Read().Then(func(v tuple.Tuple2[[]byte, error]) {
data, err := v.Get()
if err != nil {
println("Read error", err)
} else {
println("Read from client:", string(data))
client.Write(data)(func(err error) {
client.Write(data).Then(func(err error) {
if err != nil {
println("Write error", err)
} else {

View File

@@ -24,6 +24,10 @@ type Void = [0]byte
type Future[T any] func(func(T))
func (f Future[T]) Then(cb func(T)) {
f(cb)
}
// Just for pure LLGo/Go, transpile to callback in Go+
func Await[T1 any](future Future[T1]) T1 {
return Run(future)

View File

@@ -22,8 +22,25 @@ package async
import "sync"
func Async[T any](fn func(func(T))) Future[T] {
var once sync.Once
var result T
var wg sync.WaitGroup
wg.Add(1)
once.Do(func() {
go func() {
fn(func(v T) {
result = v
wg.Done()
})
}()
})
return func(chain func(T)) {
go fn(chain)
go func() {
wg.Wait()
chain(result)
}()
}
}
@@ -34,7 +51,7 @@ func Race[T1 any](futures ...Future[T1]) Future[T1] {
ch := make(chan T1)
for _, future := range futures {
future := future
future(func(v T1) {
future.Then(func(v T1) {
defer func() {
// Avoid panic when the channel is closed.
_ = recover()
@@ -56,7 +73,7 @@ func All[T1 any](futures ...Future[T1]) Future[[]T1] {
wg.Add(n)
for i, future := range futures {
i := i
future(func(v T1) {
future.Then(func(v T1) {
results[i] = v
wg.Done()
})

View File

@@ -20,6 +20,7 @@
package async
import (
"sync"
"sync/atomic"
"github.com/goplus/llgo/c/libuv"
@@ -27,23 +28,44 @@ import (
)
// Currently Async run chain a future that call chain in the goroutine running `async.Run`.
// TODO(lijie): It would better to switch when needed.
func Async[T any](fn func(func(T))) Future[T] {
return func(chain func(T)) {
var result T
var resultReady atomic.Bool
var callbacks []func(T)
var mutex sync.Mutex
loop := Exec().L
var result T
var a *libuv.Async
var cb libuv.AsyncCb
a, cb = cbind.BindF[libuv.Async, libuv.AsyncCb](func(a *libuv.Async) {
a.Close(nil)
chain(result)
mutex.Lock()
currentCallbacks := callbacks
callbacks = nil
mutex.Unlock()
for _, callback := range currentCallbacks {
callback(result)
}
})
loop.Async(a, cb)
// Execute fn immediately
fn(func(v T) {
result = v
resultReady.Store(true)
a.Send()
})
return func(chain func(T)) {
mutex.Lock()
if resultReady.Load() {
mutex.Unlock()
chain(result)
} else {
callbacks = append(callbacks, chain)
mutex.Unlock()
}
}
}
@@ -53,7 +75,7 @@ func Race[T1 any](futures ...Future[T1]) Future[T1] {
return Async(func(resolve func(T1)) {
done := atomic.Bool{}
for _, future := range futures {
future(func(v T1) {
future.Then(func(v T1) {
if !done.Swap(true) {
// Just resolve the first one.
resolve(v)
@@ -70,7 +92,7 @@ func All[T1 any](futures ...Future[T1]) Future[[]T1] {
var done uint32
for i, future := range futures {
i := i
future(func(v T1) {
future.Then(func(v T1) {
results[i] = v
if atomic.AddUint32(&done, 1) == uint32(n) {
// All done.

View File

@@ -22,7 +22,7 @@ package async
func Run[T any](future Future[T]) T {
ch := make(chan T)
go func() {
future(func(v T) {
future.Then(func(v T) {
ch <- v
})
}()

View File

@@ -59,7 +59,7 @@ func Run[T any](future Future[T]) T {
exec := &Executor{loop}
oldExec := setExec(exec)
var ret T
future(func(v T) {
future.Then(func(v T) {
ret = v
})
exec.Run()

View File

@@ -104,7 +104,7 @@ func Listen(protocol, bindAddr string, listenCb func(client *Conn, err error)) {
listenCb(nil, err)
return
}
parseAddr(bindAddr)(func(v tuple.Tuple2[*net.SockAddr, error]) {
parseAddr(bindAddr).Then(func(v tuple.Tuple2[*net.SockAddr, error]) {
addr, err := v.Get()
if err != nil {
listenCb(nil, err)
@@ -167,7 +167,7 @@ func (l *Listener) accept() (client *Conn, err error) {
func Connect(network, addr string) async.Future[tuple.Tuple2[*Conn, error]] {
return async.Async(func(resolve func(tuple.Tuple2[*Conn, error])) {
parseAddr(addr)(func(v tuple.Tuple2[*net.SockAddr, error]) {
parseAddr(addr).Then(func(v tuple.Tuple2[*net.SockAddr, error]) {
addr, err := v.Get()
if err != nil {
resolve(tuple.T2[*Conn, error]((*Conn)(nil), err))