From 566d5ef96f6e742049b0c417a53bdf0862a0da97 Mon Sep 17 00:00:00 2001 From: Li Jie Date: Sun, 8 Sep 2024 20:27:05 +0800 Subject: [PATCH 1/7] add Future.Then --- x/{socketio => async}/README.md | 20 ++++++++++++-------- x/async/TODO.md | 16 ++++++++++++++++ x/async/_demo/all/all.go | 32 ++++++++++++++++---------------- x/async/async.go | 4 ++++ x/async/async_go.go | 4 ++-- x/async/async_llgo.go | 4 ++-- x/async/executor_go.go | 2 +- x/async/executor_llgo.go | 2 +- x/socketio/socketio_llgo.go | 4 ++-- 9 files changed, 56 insertions(+), 32 deletions(-) rename x/{socketio => async}/README.md (97%) create mode 100644 x/async/TODO.md diff --git a/x/socketio/README.md b/x/async/README.md similarity index 97% rename from x/socketio/README.md rename to x/async/README.md index da6a4c8e..f8ccc110 100644 --- a/x/socketio/README.md +++ b/x/async/README.md @@ -372,18 +372,22 @@ Introduce `async.IO[T]` type to represent an asynchronous operation, `async.Futu ```go package async -type Future[T any] func() T -type IO[T any] func() Future[T] +type Future[T any] func(func(T)) + +func Await[T any](future Future[T]) T func main() { - io := func() Future[string] { - return func() string { - return "Hello, World!" + hello := func() Future[string] { + return func(resolve func(string)) { + resolve("Hello, World!") } } - future := io() - value := future() - println(value) + future := hello() + future(func(value string) { + println(value) + }) + + println(Await(future)) } ``` diff --git a/x/async/TODO.md b/x/async/TODO.md new file mode 100644 index 00000000..78a2373a --- /dev/null +++ b/x/async/TODO.md @@ -0,0 +1,16 @@ +讨论: + +1. Future 用 interface 还是闭包:性能应该差不多,如果没有其他方法要暴露,感觉也没有换成 interface 的必要。 +2. 几个方法提供不同参数个数的版本还是用 tuple:如果编译器不支持可变泛型参数个数和特化,我倾向用 tuple 先简化实现,tuple 的开销应该也容易被编译器优化掉。多个方法让用户选择 Await2/Await3 这种也恶心。 +3. 是否 Cancellable,暂时不加进去,多一个 context,也不一定能快速稳定下来,可以后面根据实践再改。 +4. Executor 可能会变化,目前提供的 Run 是阻塞的,也可以把它做成异步。 +5. 尽量再隐藏一些辅助类型,比如 TupleN,可能之提供 tuple 的构造和返回多值。内部的 libuv 如果隐藏可能要暴露同等接口,先不动了 +6. 性能可能做个简单测试,但不是关键,只要别太差。未来可能会尽量减少 executor 的切换、尽量多并行 +7. 异常兼容性:目前没考虑,这个要在回调里处理可能困难,要么就在 await 上处理,可以往后放一下,毕竟 golang 主要是以 error 为主 +8. 可能先看一下如何在 go+里面集成,判断目前的设计实现是否合理 +9. 多封装一些库看看通用性和易用性,\_demo 里几个简单例子基本符合预期,还需要更多检验 + +TODO: + +10. select 兼容 (可能把 Future 改为 interface 更合理?) +11. Future 只会被执行一次 diff --git a/x/async/_demo/all/all.go b/x/async/_demo/all/all.go index 0f78e648..004ba402 100644 --- a/x/async/_demo/all/all.go +++ b/x/async/_demo/all/all.go @@ -32,7 +32,7 @@ func WriteFile(fileName string, content []byte) async.Future[error] { func sleep(i int, d time.Duration) async.Future[int] { return async.Async(func(resolve func(int)) { - timeout.Timeout(d)(func(async.Void) { + timeout.Timeout(d).Then(func(async.Void) { resolve(i) }) }) @@ -70,7 +70,7 @@ func RunIO() { println("RunIO with BindIO") async.Run(async.Async(func(resolve func(async.Void)) { - ReadFile("all.go")(func(v tuple.Tuple2[[]byte, error]) { + ReadFile("all.go").Then(func(v tuple.Tuple2[[]byte, error]) { content, err := v.Get() if err != nil { fmt.Printf("read err: %v\n", err) @@ -78,7 +78,7 @@ func RunIO() { return } fmt.Printf("read content: %s\n", content) - WriteFile("2.out", content)(func(v error) { + WriteFile("2.out", content).Then(func(v error) { err = v if err != nil { fmt.Printf("write err: %v\n", err) @@ -100,7 +100,7 @@ func RunAllAndRace() { println("Run All with Await") async.Run(async.Async(func(resolve func(async.Void)) { - async.All(sleep(1, ms200), sleep(2, ms100), sleep(3, ms300))(func(v []int) { + async.All(sleep(1, ms200), sleep(2, ms100), sleep(3, ms300)).Then(func(v []int) { fmt.Printf("All: %v\n", v) resolve(async.Void{}) }) @@ -120,7 +120,7 @@ func RunAllAndRace() { println("Run All with BindIO") async.Run(async.Async(func(resolve func(async.Void)) { - async.All(sleep(1, ms200), sleep(2, ms100), sleep(3, ms300))(func(v []int) { + async.All(sleep(1, ms200), sleep(2, ms100), sleep(3, ms300)).Then(func(v []int) { fmt.Printf("All: %v\n", v) resolve(async.Void{}) }) @@ -129,7 +129,7 @@ func RunAllAndRace() { println("Run Race with BindIO") async.Run(async.Async(func(resolve func(async.Void)) { - async.Race(sleep(1, ms200), sleep(2, ms100), sleep(3, ms300))(func(v int) { + async.Race(sleep(1, ms200), sleep(2, ms100), sleep(3, ms300)).Then(func(v int) { fmt.Printf("Race: %v\n", v) resolve(async.Void{}) }) @@ -152,7 +152,7 @@ func RunTimeout() { async.Run(async.Async(func(resolve func(async.Void)) { fmt.Printf("Start 100 ms timeout\n") - timeout.Timeout(100 * time.Millisecond)(func(async.Void) { + timeout.Timeout(100 * time.Millisecond).Then(func(async.Void) { fmt.Printf("timeout\n") resolve(async.Void{}) }) @@ -165,15 +165,15 @@ func RunSocket() { async.Run(async.Async(func(resolve func(async.Void)) { println("RunServer") - RunServer()(func(async.Void) { + RunServer().Then(func(async.Void) { println("RunServer done") resolve(async.Void{}) }) println("RunClient") - timeout.Timeout(100 * time.Millisecond)(func(async.Void) { - RunClient()(func(async.Void) { + timeout.Timeout(100 * time.Millisecond).Then(func(async.Void) { + RunClient().Then(func(async.Void) { println("RunClient done") resolve(async.Void{}) }) @@ -184,7 +184,7 @@ func RunSocket() { func RunClient() async.Future[async.Void] { return async.Async(func(resolve func(async.Void)) { addr := "127.0.0.1:3927" - socketio.Connect("tcp", addr)(func(v tuple.Tuple2[*socketio.Conn, error]) { + socketio.Connect("tcp", addr).Then(func(v tuple.Tuple2[*socketio.Conn, error]) { client, err := v.Get() println("Connected", client, err) if err != nil { @@ -195,17 +195,17 @@ func RunClient() async.Future[async.Void] { loop = func(client *socketio.Conn) { counter++ data := fmt.Sprintf("Hello %d", counter) - client.Write([]byte(data))(func(err error) { + client.Write([]byte(data)).Then(func(err error) { if err != nil { panic(err) } - client.Read()(func(v tuple.Tuple2[[]byte, error]) { + client.Read().Then(func(v tuple.Tuple2[[]byte, error]) { data, err := v.Get() if err != nil { panic(err) } println("Read from server:", string(data)) - timeout.Timeout(1 * time.Second)(func(async.Void) { + timeout.Timeout(1 * time.Second).Then(func(async.Void) { loop(client) }) }) @@ -222,13 +222,13 @@ func RunServer() async.Future[async.Void] { println("Client connected", client, err) var loop func(client *socketio.Conn) loop = func(client *socketio.Conn) { - client.Read()(func(v tuple.Tuple2[[]byte, error]) { + client.Read().Then(func(v tuple.Tuple2[[]byte, error]) { data, err := v.Get() if err != nil { println("Read error", err) } else { println("Read from client:", string(data)) - client.Write(data)(func(err error) { + client.Write(data).Then(func(err error) { if err != nil { println("Write error", err) } else { diff --git a/x/async/async.go b/x/async/async.go index fcd9a2e9..6b93f0e7 100644 --- a/x/async/async.go +++ b/x/async/async.go @@ -24,6 +24,10 @@ type Void = [0]byte type Future[T any] func(func(T)) +func (f Future[T]) Then(cb func(T)) { + f(cb) +} + // Just for pure LLGo/Go, transpile to callback in Go+ func Await[T1 any](future Future[T1]) T1 { return Run(future) diff --git a/x/async/async_go.go b/x/async/async_go.go index c775b7e8..47f691c3 100644 --- a/x/async/async_go.go +++ b/x/async/async_go.go @@ -34,7 +34,7 @@ func Race[T1 any](futures ...Future[T1]) Future[T1] { ch := make(chan T1) for _, future := range futures { future := future - future(func(v T1) { + future.Then(func(v T1) { defer func() { // Avoid panic when the channel is closed. _ = recover() @@ -56,7 +56,7 @@ func All[T1 any](futures ...Future[T1]) Future[[]T1] { wg.Add(n) for i, future := range futures { i := i - future(func(v T1) { + future.Then(func(v T1) { results[i] = v wg.Done() }) diff --git a/x/async/async_llgo.go b/x/async/async_llgo.go index d301b20c..9a7c7285 100644 --- a/x/async/async_llgo.go +++ b/x/async/async_llgo.go @@ -53,7 +53,7 @@ func Race[T1 any](futures ...Future[T1]) Future[T1] { return Async(func(resolve func(T1)) { done := atomic.Bool{} for _, future := range futures { - future(func(v T1) { + future.Then(func(v T1) { if !done.Swap(true) { // Just resolve the first one. resolve(v) @@ -70,7 +70,7 @@ func All[T1 any](futures ...Future[T1]) Future[[]T1] { var done uint32 for i, future := range futures { i := i - future(func(v T1) { + future.Then(func(v T1) { results[i] = v if atomic.AddUint32(&done, 1) == uint32(n) { // All done. diff --git a/x/async/executor_go.go b/x/async/executor_go.go index 60962638..38667aa7 100644 --- a/x/async/executor_go.go +++ b/x/async/executor_go.go @@ -22,7 +22,7 @@ package async func Run[T any](future Future[T]) T { ch := make(chan T) go func() { - future(func(v T) { + future.Then(func(v T) { ch <- v }) }() diff --git a/x/async/executor_llgo.go b/x/async/executor_llgo.go index e1e03a6a..af28295f 100644 --- a/x/async/executor_llgo.go +++ b/x/async/executor_llgo.go @@ -59,7 +59,7 @@ func Run[T any](future Future[T]) T { exec := &Executor{loop} oldExec := setExec(exec) var ret T - future(func(v T) { + future.Then(func(v T) { ret = v }) exec.Run() diff --git a/x/socketio/socketio_llgo.go b/x/socketio/socketio_llgo.go index e6b799a5..970e70c6 100644 --- a/x/socketio/socketio_llgo.go +++ b/x/socketio/socketio_llgo.go @@ -104,7 +104,7 @@ func Listen(protocol, bindAddr string, listenCb func(client *Conn, err error)) { listenCb(nil, err) return } - parseAddr(bindAddr)(func(v tuple.Tuple2[*net.SockAddr, error]) { + parseAddr(bindAddr).Then(func(v tuple.Tuple2[*net.SockAddr, error]) { addr, err := v.Get() if err != nil { listenCb(nil, err) @@ -167,7 +167,7 @@ func (l *Listener) accept() (client *Conn, err error) { func Connect(network, addr string) async.Future[tuple.Tuple2[*Conn, error]] { return async.Async(func(resolve func(tuple.Tuple2[*Conn, error])) { - parseAddr(addr)(func(v tuple.Tuple2[*net.SockAddr, error]) { + parseAddr(addr).Then(func(v tuple.Tuple2[*net.SockAddr, error]) { addr, err := v.Get() if err != nil { resolve(tuple.T2[*Conn, error]((*Conn)(nil), err)) From ccc7d056ba830d9aab9534d4400fd1db4cc1c276 Mon Sep 17 00:00:00 2001 From: Li Jie Date: Sun, 8 Sep 2024 20:29:24 +0800 Subject: [PATCH 2/7] socketio example: two tcp clients --- x/async/_demo/all/all.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/x/async/_demo/all/all.go b/x/async/_demo/all/all.go index 004ba402..6c38e49c 100644 --- a/x/async/_demo/all/all.go +++ b/x/async/_demo/all/all.go @@ -173,7 +173,11 @@ func RunSocket() { println("RunClient") timeout.Timeout(100 * time.Millisecond).Then(func(async.Void) { - RunClient().Then(func(async.Void) { + RunClient("Bob").Then(func(async.Void) { + println("RunClient done") + resolve(async.Void{}) + }) + RunClient("Uncle").Then(func(async.Void) { println("RunClient done") resolve(async.Void{}) }) @@ -181,7 +185,7 @@ func RunSocket() { })) } -func RunClient() async.Future[async.Void] { +func RunClient(name string) async.Future[async.Void] { return async.Async(func(resolve func(async.Void)) { addr := "127.0.0.1:3927" socketio.Connect("tcp", addr).Then(func(v tuple.Tuple2[*socketio.Conn, error]) { @@ -194,7 +198,7 @@ func RunClient() async.Future[async.Void] { var loop func(client *socketio.Conn) loop = func(client *socketio.Conn) { counter++ - data := fmt.Sprintf("Hello %d", counter) + data := fmt.Sprintf("Hello from %s %d", name, counter) client.Write([]byte(data)).Then(func(err error) { if err != nil { panic(err) From 44617b6554ebb9a1c2bfa1bd4c7793fd3a725671 Mon Sep 17 00:00:00 2001 From: Li Jie Date: Mon, 9 Sep 2024 09:34:29 +0800 Subject: [PATCH 3/7] future supports multi-await but run once --- x/async/TODO.md | 7 ++-- x/async/_demo/all/all.go | 73 ++++++++++++++++++++++++++++++++++++++ x/async/async_go.go | 19 +++++++++- x/async/async_llgo.go | 76 ++++++++++++++++++++++++++++++++-------- 4 files changed, 156 insertions(+), 19 deletions(-) diff --git a/x/async/TODO.md b/x/async/TODO.md index 78a2373a..6da0db45 100644 --- a/x/async/TODO.md +++ b/x/async/TODO.md @@ -1,6 +1,6 @@ 讨论: -1. Future 用 interface 还是闭包:性能应该差不多,如果没有其他方法要暴露,感觉也没有换成 interface 的必要。 +1. Future 用 interface 还是闭包:性能应该差不多,如果没有其他方法要暴露,感觉也没有换成 interface 的必要,interface 多一个对象分配。先添加 Then 方法方便未来替换。 2. 几个方法提供不同参数个数的版本还是用 tuple:如果编译器不支持可变泛型参数个数和特化,我倾向用 tuple 先简化实现,tuple 的开销应该也容易被编译器优化掉。多个方法让用户选择 Await2/Await3 这种也恶心。 3. 是否 Cancellable,暂时不加进去,多一个 context,也不一定能快速稳定下来,可以后面根据实践再改。 4. Executor 可能会变化,目前提供的 Run 是阻塞的,也可以把它做成异步。 @@ -12,5 +12,6 @@ TODO: -10. select 兼容 (可能把 Future 改为 interface 更合理?) -11. Future 只会被执行一次 +[ ] 1. select 兼容 (可能把 Future 改为 interface 更合理?) +[x] 2. Future 多个 Await 只会被执行一次 +[x] 3. Future 添加 Then 方法,不推荐直接当作函数调用,方便未来切换 diff --git a/x/async/_demo/all/all.go b/x/async/_demo/all/all.go index 6c38e49c..80ce489f 100644 --- a/x/async/_demo/all/all.go +++ b/x/async/_demo/all/all.go @@ -3,6 +3,7 @@ package main import ( "fmt" "os" + "sync/atomic" "time" "github.com/goplus/llgo/x/async" @@ -42,6 +43,8 @@ func main() { RunIO() RunAllAndRace() RunTimeout() + RunMultipleCallbacksNodelay() + RunMultipleCallbacksDelay() RunSocket() } @@ -159,6 +162,76 @@ func RunTimeout() { })) } +func RunMultipleCallbacksNodelay() { + println("Run Multiple Callbacks") + + runCnt := atomic.Int32{} + async.Run(async.Async(func(resolve func(async.Void)) { + nodelay := async.Async(func(resolve func(async.Void)) { + println("nodelay") + runCnt.Add(1) + resolve(async.Void{}) + }) + + cbCnt := atomic.Int32{} + cb := func() { + if cbCnt.Add(1) == 2 { + resolve(async.Void{}) + } + } + + nodelay.Then(func(async.Void) { + println("nodelay done") + cb() + }) + + nodelay.Then(func(async.Void) { + println("nodelay done again") + cb() + }) + })) + + if runCnt.Load() != 1 { + panic("runCnt != 1") + } +} + +func RunMultipleCallbacksDelay() { + println("Run Multiple Callbacks") + + runCnt := atomic.Int32{} + async.Run(async.Async(func(resolve func(async.Void)) { + delay := async.Async(func(resolve func(async.Void)) { + timeout.Timeout(100 * time.Millisecond).Then(func(async.Void) { + println("delay") + runCnt.Add(1) + resolve(async.Void{}) + }) + }) + + cbCnt := atomic.Int32{} + cb := func() { + if cbCnt.Add(1) == 2 { + resolve(async.Void{}) + } + } + + delay.Then(func(async.Void) { + println("delay done") + cb() + }) + + delay.Then(func(async.Void) { + println("delay done again") + cb() + }) + })) + + if runCnt.Load() != 1 { + panic("runCnt != 1") + } +} + func RunSocket() { println("Run Socket") diff --git a/x/async/async_go.go b/x/async/async_go.go index 47f691c3..42addf15 100644 --- a/x/async/async_go.go +++ b/x/async/async_go.go @@ -22,8 +22,25 @@ package async import "sync" func Async[T any](fn func(func(T))) Future[T] { + var once sync.Once + var result T + var wg sync.WaitGroup + wg.Add(1) + return func(chain func(T)) { - go fn(chain) + once.Do(func() { + go func() { + fn(func(v T) { + result = v + wg.Done() + }) + }() + }) + + go func() { + wg.Wait() + chain(result) + }() } } diff --git a/x/async/async_llgo.go b/x/async/async_llgo.go index 9a7c7285..8f297db5 100644 --- a/x/async/async_llgo.go +++ b/x/async/async_llgo.go @@ -20,6 +20,7 @@ package async import ( + "sync" "sync/atomic" "github.com/goplus/llgo/c/libuv" @@ -27,23 +28,68 @@ import ( ) // Currently Async run chain a future that call chain in the goroutine running `async.Run`. -// TODO(lijie): It would better to switch when needed. -func Async[T any](fn func(func(T))) Future[T] { - return func(chain func(T)) { - loop := Exec().L +// Basic implementation: +// func Async[T any](fn func(func(T))) Future[T] { +// return func(chain func(T)) { +// loop := Exec().L - var result T - var a *libuv.Async - var cb libuv.AsyncCb - a, cb = cbind.BindF[libuv.Async, libuv.AsyncCb](func(a *libuv.Async) { - a.Close(nil) +// var result T +// var a *libuv.Async +// var cb libuv.AsyncCb +// a, cb = cbind.BindF[libuv.Async, libuv.AsyncCb](func(a *libuv.Async) { +// a.Close(nil) +// chain(result) +// }) +// loop.Async(a, cb) +// fn(func(v T) { +// result = v +// a.Send() +// }) +// } +// } +// +// Better implementation to support multiple callbacks: +func Async[T any](fn func(func(T))) Future[T] { + var result T + var done atomic.Bool + var resultReady atomic.Bool + var callbacks []func(T) + var mutex sync.Mutex + + return func(chain func(T)) { + mutex.Lock() + if resultReady.Load() { + mutex.Unlock() chain(result) - }) - loop.Async(a, cb) - fn(func(v T) { - result = v - a.Send() - }) + return + } + callbacks = append(callbacks, chain) + if !done.Swap(true) { + mutex.Unlock() + loop := Exec().L + + var a *libuv.Async + var cb libuv.AsyncCb + a, cb = cbind.BindF[libuv.Async, libuv.AsyncCb](func(a *libuv.Async) { + a.Close(nil) + mutex.Lock() + resultReady.Store(true) + currentCallbacks := callbacks + callbacks = nil + mutex.Unlock() + + for _, callback := range currentCallbacks { + callback(result) + } + }) + loop.Async(a, cb) + fn(func(v T) { + result = v + a.Send() + }) + } else { + mutex.Unlock() + } } } From 44c4488fcc3ede09448952e77b745c33d3d901b3 Mon Sep 17 00:00:00 2001 From: Li Jie Date: Mon, 9 Sep 2024 10:41:22 +0800 Subject: [PATCH 4/7] async doc update --- x/async/README.md | 44 +++++++++++++++++++++++++++++++++++++++----- 1 file changed, 39 insertions(+), 5 deletions(-) diff --git a/x/async/README.md b/x/async/README.md index f8ccc110..058da95d 100644 --- a/x/async/README.md +++ b/x/async/README.md @@ -362,19 +362,49 @@ In some situations, you may want to get the first result of multiple async opera ## Design considerations in LLGo - Don't introduce `async`/`await` keywords to compatible with Go compiler (just compiling) -- For performance reason don't implement async functions with goroutines +- For performance and memory reasons don't implement async functions with goroutines - Avoid implementing `Promise` by using `chan` to avoid blocking the thread, but it can be wrapped as a `chan` to make it compatible `select` statement ## Design -Introduce `async.IO[T]` type to represent an asynchronous operation, `async.Future[T]` type to represent the result of an asynchronous operation. `async.IO[T]` can be `bind` to a function that accepts `T` as an argument to chain multiple asynchronous operations. `async.IO[T]` can be `await` to get the value of the asynchronous operation. +### `async.Future[T]` type + +Introduce `async.Future[T]` type to represent an asynchronous operation that will produce a value of type `T`, similar to `Promise`/`Future` in other languages. + +### Future creation + +`async.Future[T]` can create by `async.Async[T]` function that takes a function that accepts a `resolve` function to produce a value of type `T`. + +### Future chaining (asynchronous callbacks style) + +`async.Future[T]` can be chained with `Then` method to add multiple callbacks to be executed when the operation is completed, it just runs once and calls every callbacks. `Then` method can't be chained multiple times because Go doesn't support generics method currently (Need support `func (f Future[T]) Then[U any](f func(T) Future[U]) Future[U]`). + +### Future waiting (synchronous style) + +`async.Await[T]` function can be used to wait for the completion of a `Future[T]` and return the value produced by the operation. + +### `async.Run[T]` function + +`async.Run[T]` function can be used to run the asynchronous operation and get the result of the operation. Currently it will switch the callbacks to the goroutine that calls `async.Run[T]` function, this maybe changed in the future to reduce the overhead of switching goroutines and make it more parallel. + +### `async.Future[T]` prototype ```go package async type Future[T any] func(func(T)) +func (f Future[T]) Then(f func(T)) + +func Async[T any](f func(resolve func(T))) Future[T] + func Await[T any](future Future[T]) T +``` + +### Example + +```go +package main func main() { hello := func() Future[string] { @@ -384,10 +414,14 @@ func main() { } future := hello() - future(func(value string) { - println(value) + future.Then(func(value string) { + println("first callback:", value) + }) + future.Then(func(value string) { + println("second callback:", value) }) - println(Await(future)) + println("first await:", Await(future)) + println("second await:", Await(future)) } ``` From 12f460e3765c648c78276ce2d0e47b7af05bf7f2 Mon Sep 17 00:00:00 2001 From: Li Jie Date: Tue, 10 Sep 2024 11:43:44 +0800 Subject: [PATCH 5/7] async.Run as global context, async operations run immediately --- x/async/README.md | 50 +++++--- x/async/_demo/all/all.go | 254 +++++++++++++++++---------------------- x/async/async_go.go | 22 ++-- x/async/async_llgo.go | 72 ++++------- 4 files changed, 178 insertions(+), 220 deletions(-) diff --git a/x/async/README.md b/x/async/README.md index 058da95d..db79c8d5 100644 --- a/x/async/README.md +++ b/x/async/README.md @@ -361,56 +361,71 @@ In some situations, you may want to get the first result of multiple async opera ## Design considerations in LLGo -- Don't introduce `async`/`await` keywords to compatible with Go compiler (just compiling) -- For performance and memory reasons don't implement async functions with goroutines -- Avoid implementing `Promise` by using `chan` to avoid blocking the thread, but it can be wrapped as a `chan` to make it compatible `select` statement +- Don't introduce `async`/`await` keywords to compatible with Go +- For performance and memory reasons don't implement async functions with goroutines, coroutines, or other mechanisms that require per-task stack allocation +- Avoid implementing async task by using `chan` that blocking the thread ## Design ### `async.Future[T]` type -Introduce `async.Future[T]` type to represent an asynchronous operation that will produce a value of type `T`, similar to `Promise`/`Future` in other languages. +Introduce `async.Future[T]` type to represent an eventual completion (or failure) of an asynchronous operation and its resulting value, similar to `Promise`/`Future` in other languages. Functions that return `async.Future[T]` are considered asynchronous functions. ### Future creation -`async.Future[T]` can create by `async.Async[T]` function that takes a function that accepts a `resolve` function to produce a value of type `T`. +`async.Future[T]` can be created by `async.Async[T]` function that takes a function that accepts a `resolve` function to produce a value of type `T`. ### Future chaining (asynchronous callbacks style) -`async.Future[T]` can be chained with `Then` method to add multiple callbacks to be executed when the operation is completed, it just runs once and calls every callbacks. `Then` method can't be chained multiple times because Go doesn't support generics method currently (Need support `func (f Future[T]) Then[U any](f func(T) Future[U]) Future[U]`). +`async.Future[T]` can be chained with `Then` method to add multiple callbacks to be executed when the operation is completed, it just runs once and calls every callbacks. Currently `Then` method can't be chained multiple times because Go doesn't support generics method (Need support `func (f Future[T]) Then[U any](f func(T) Future[U]) Future[U]`) and function overload currently, maybe implements in Go+. ### Future waiting (synchronous style) -`async.Await[T]` function can be used to wait for the completion of a `Future[T]` and return the value produced by the operation. +`async.Await[T]` function can be used to wait for the completion of a `Future[T]` and return the value produced by the operation. In LLGo, `async.Await[T]` is a blocking function that waits for the completion of the `Future[T]` and returns the value synchronously, it would be transformed to `Future.Then` callback in the frontend. ### `async.Run[T]` function -`async.Run[T]` function can be used to run the asynchronous operation and get the result of the operation. Currently it will switch the callbacks to the goroutine that calls `async.Run[T]` function, this maybe changed in the future to reduce the overhead of switching goroutines and make it more parallel. +`async.Run[T]` function can be used to create an global asynchronous context and run async functions, and it would be hidden by the compiler in the future. -### `async.Future[T]` prototype +Currently it will switch the callbacks to the goroutine that calls `async.Run[T]` function, this maybe changed in the future to reduce the overhead of switching goroutines and make it more parallel. + +### Prototype ```go package async -type Future[T any] func(func(T)) - -func (f Future[T]) Then(f func(T)) +type Future[T any] interface { + Then(f func(T)) +} func Async[T any](f func(resolve func(T))) Future[T] func Await[T any](future Future[T]) T ``` +### Some async functions + +```go +package async + + +func Race[T1 any](futures ...Future[T1]) Future[T1] + +func All[T1 any](futures ...Future[T1]) Future[[]T1] + +``` + ### Example ```go package main func main() { - hello := func() Future[string] { - return func(resolve func(string)) { + async.Run(func() { + hello := func() async.Future[string] { + return async.Async(func(resolve func(string)) { resolve("Hello, World!") - } + }) } future := hello() @@ -421,7 +436,8 @@ func main() { println("second callback:", value) }) - println("first await:", Await(future)) - println("second await:", Await(future)) + println("first await:", async.Await(future)) + println("second await:", async.Await(future)) + }) } ``` diff --git a/x/async/_demo/all/all.go b/x/async/_demo/all/all.go index 80ce489f..652bed28 100644 --- a/x/async/_demo/all/all.go +++ b/x/async/_demo/all/all.go @@ -40,59 +40,54 @@ func sleep(i int, d time.Duration) async.Future[int] { } func main() { - RunIO() - RunAllAndRace() - RunTimeout() - RunMultipleCallbacksNodelay() - RunMultipleCallbacksDelay() - RunSocket() + async.Run(func(resolve func(async.Void)) { + RunIO() + RunAllAndRace() + RunTimeout() + RunMultipleCallbacksNodelay() + RunMultipleCallbacksDelay() + RunSocket() + }) } func RunIO() { println("RunIO with Await") // Hide `resolve` in Go+ - async.Run(async.Async(func(resolve func(async.Void)) { - println("read file") - defer resolve(async.Void{}) - content, err := async.Await(ReadFile("all.go")).Get() + + println("read file") + content, err := async.Await(ReadFile("all.go")).Get() + if err != nil { + fmt.Printf("read err: %v\n", err) + return + } + fmt.Printf("read content: %s\n", content) + err = async.Await(WriteFile("2.out", content)) + if err != nil { + fmt.Printf("write err: %v\n", err) + return + } + fmt.Printf("write done\n") + + // Translated Await to BindIO in Go+: + println("RunIO with BindIO") + + ReadFile("all.go").Then(func(v tuple.Tuple2[[]byte, error]) { + content, err := v.Get() if err != nil { fmt.Printf("read err: %v\n", err) return } fmt.Printf("read content: %s\n", content) - err = async.Await(WriteFile("2.out", content)) - if err != nil { - fmt.Printf("write err: %v\n", err) - return - } - fmt.Printf("write done\n") - })) - - // Translated Await to BindIO in Go+: - println("RunIO with BindIO") - - async.Run(async.Async(func(resolve func(async.Void)) { - ReadFile("all.go").Then(func(v tuple.Tuple2[[]byte, error]) { - content, err := v.Get() + WriteFile("2.out", content).Then(func(v error) { + err = v if err != nil { - fmt.Printf("read err: %v\n", err) - resolve(async.Void{}) + fmt.Printf("write err: %v\n", err) return } - fmt.Printf("read content: %s\n", content) - WriteFile("2.out", content).Then(func(v error) { - err = v - if err != nil { - fmt.Printf("write err: %v\n", err) - resolve(async.Void{}) - return - } - println("write done") - resolve(async.Void{}) - }) + println("write done") }) - })) + }) } func RunAllAndRace() { @@ -102,160 +97,133 @@ func RunAllAndRace() { println("Run All with Await") - async.Run(async.Async(func(resolve func(async.Void)) { - async.All(sleep(1, ms200), sleep(2, ms100), sleep(3, ms300)).Then(func(v []int) { - fmt.Printf("All: %v\n", v) - resolve(async.Void{}) - }) - })) + async.All(sleep(1, ms200), sleep(2, ms100), sleep(3, ms300)).Then(func(v []int) { + fmt.Printf("All: %v\n", v) + }) println("Run Race with Await") - async.Run(async.Async(func(resolve func(async.Void)) { - first := async.Race(sleep(1, ms200), sleep(2, ms100), sleep(3, ms300)) - v := async.Await(first) - fmt.Printf("Race: %v\n", v) - resolve(async.Void{}) - })) + first := async.Race(sleep(1, ms200), sleep(2, ms100), sleep(3, ms300)) + v := async.Await(first) + fmt.Printf("Race: %v\n", v) // Translated to in Go+: println("Run All with BindIO") - async.Run(async.Async(func(resolve func(async.Void)) { - async.All(sleep(1, ms200), sleep(2, ms100), sleep(3, ms300)).Then(func(v []int) { - fmt.Printf("All: %v\n", v) - resolve(async.Void{}) - }) - })) + async.All(sleep(1, ms200), sleep(2, ms100), sleep(3, ms300)).Then(func(v []int) { + fmt.Printf("All: %v\n", v) + }) println("Run Race with BindIO") - async.Run(async.Async(func(resolve func(async.Void)) { - async.Race(sleep(1, ms200), sleep(2, ms100), sleep(3, ms300)).Then(func(v int) { - fmt.Printf("Race: %v\n", v) - resolve(async.Void{}) - }) - })) + async.Race(sleep(1, ms200), sleep(2, ms100), sleep(3, ms300)).Then(func(v int) { + fmt.Printf("Race: %v\n", v) + }) + } func RunTimeout() { println("Run Timeout with Await") - async.Run(async.Async(func(resolve func(async.Void)) { - fmt.Printf("Start 100 ms timeout\n") - async.Await(timeout.Timeout(100 * time.Millisecond)) - fmt.Printf("timeout\n") - resolve(async.Void{}) - })) + fmt.Printf("Start 100 ms timeout\n") + async.Await(timeout.Timeout(100 * time.Millisecond)) + fmt.Printf("timeout\n") // Translated to in Go+: println("Run Timeout with BindIO") - async.Run(async.Async(func(resolve func(async.Void)) { - fmt.Printf("Start 100 ms timeout\n") - timeout.Timeout(100 * time.Millisecond).Then(func(async.Void) { - fmt.Printf("timeout\n") - resolve(async.Void{}) - }) - })) + fmt.Printf("Start 100 ms timeout\n") + timeout.Timeout(100 * time.Millisecond).Then(func(async.Void) { + fmt.Printf("timeout\n") + }) } func RunMultipleCallbacksNodelay() { println("Run Multiple Callbacks") runCnt := atomic.Int32{} - async.Run(async.Async(func(resolve func(async.Void)) { - nodelay := async.Async(func(resolve func(async.Void)) { - println("nodelay") - runCnt.Add(1) - resolve(async.Void{}) - }) - cbCnt := atomic.Int32{} - cb := func() { - if cbCnt.Add(1) == 2 { - resolve(async.Void{}) + nodelay := async.Async(func(resolve func(async.Void)) { + println("nodelay") + runCnt.Add(1) + }) + + cbCnt := atomic.Int32{} + cb := func() { + if cbCnt.Add(1) == 2 { + if runCnt.Load() != 1 { + panic("runCnt != 1, got: " + fmt.Sprint(runCnt.Load())) + } else { + println("runCnt == 1") } } - - nodelay.Then(func(async.Void) { - println("nodelay done") - cb() - }) - - nodelay.Then(func(async.Void) { - println("nodelay done again") - cb() - }) - })) - - if runCnt.Load() != 1 { - panic("runCnt != 1") } + nodelay.Then(func(async.Void) { + println("nodelay done") + cb() + }) + + nodelay.Then(func(async.Void) { + println("nodelay done again") + cb() + }) } func RunMultipleCallbacksDelay() { println("Run Multiple Callbacks") runCnt := atomic.Int32{} - async.Run(async.Async(func(resolve func(async.Void)) { - delay := async.Async(func(resolve func(async.Void)) { - timeout.Timeout(100 * time.Millisecond).Then(func(async.Void) { - println("delay") - runCnt.Add(1) - resolve(async.Void{}) - }) - }) - cbCnt := atomic.Int32{} - cb := func() { - if cbCnt.Add(1) == 2 { - resolve(async.Void{}) + delay := async.Async(func(resolve func(async.Void)) { + timeout.Timeout(100 * time.Millisecond).Then(func(async.Void) { + println("delay") + runCnt.Add(1) + }) + }) + + cbCnt := atomic.Int32{} + cb := func() { + if cbCnt.Add(1) == 2 { + if runCnt.Load() != 1 { + panic("runCnt != 1, got: " + fmt.Sprint(runCnt.Load())) + } else { + println("runCnt == 1") } } - - delay.Then(func(async.Void) { - println("delay done") - cb() - }) - - delay.Then(func(async.Void) { - println("delay done again") - cb() - }) - })) - - if runCnt.Load() != 1 { - panic("runCnt != 1") } + + delay.Then(func(async.Void) { + println("delay done") + cb() + }) + + delay.Then(func(async.Void) { + println("delay done again") + cb() + }) } func RunSocket() { println("Run Socket") - async.Run(async.Async(func(resolve func(async.Void)) { - println("RunServer") + println("RunServer") - RunServer().Then(func(async.Void) { - println("RunServer done") - resolve(async.Void{}) + RunServer().Then(func(async.Void) { + println("RunServer done") + }) + + println("RunClient") + + timeout.Timeout(100 * time.Millisecond).Then(func(async.Void) { + RunClient("Bob").Then(func(async.Void) { + println("RunClient done") }) - - println("RunClient") - - timeout.Timeout(100 * time.Millisecond).Then(func(async.Void) { - RunClient("Bob").Then(func(async.Void) { - println("RunClient done") - resolve(async.Void{}) - }) - RunClient("Uncle").Then(func(async.Void) { - println("RunClient done") - resolve(async.Void{}) - }) + RunClient("Uncle").Then(func(async.Void) { + println("RunClient done") }) - })) + }) } func RunClient(name string) async.Future[async.Void] { diff --git a/x/async/async_go.go b/x/async/async_go.go index 42addf15..7f905e80 100644 --- a/x/async/async_go.go +++ b/x/async/async_go.go @@ -27,20 +27,18 @@ func Async[T any](fn func(func(T))) Future[T] { var wg sync.WaitGroup wg.Add(1) - return func(chain func(T)) { - once.Do(func() { - go func() { - fn(func(v T) { - result = v - wg.Done() - }) - }() - }) - + once.Do(func() { go func() { - wg.Wait() - chain(result) + fn(func(v T) { + result = v + wg.Done() + }) }() + }) + + return func(chain func(T)) { + wg.Wait() + chain(result) } } diff --git a/x/async/async_llgo.go b/x/async/async_llgo.go index 8f297db5..b6bad936 100644 --- a/x/async/async_llgo.go +++ b/x/async/async_llgo.go @@ -28,66 +28,42 @@ import ( ) // Currently Async run chain a future that call chain in the goroutine running `async.Run`. -// Basic implementation: -// func Async[T any](fn func(func(T))) Future[T] { -// return func(chain func(T)) { -// loop := Exec().L - -// var result T -// var a *libuv.Async -// var cb libuv.AsyncCb -// a, cb = cbind.BindF[libuv.Async, libuv.AsyncCb](func(a *libuv.Async) { -// a.Close(nil) -// chain(result) -// }) -// loop.Async(a, cb) -// fn(func(v T) { -// result = v -// a.Send() -// }) -// } -// } -// -// Better implementation to support multiple callbacks: func Async[T any](fn func(func(T))) Future[T] { var result T - var done atomic.Bool var resultReady atomic.Bool var callbacks []func(T) var mutex sync.Mutex + loop := Exec().L + + var a *libuv.Async + var cb libuv.AsyncCb + a, cb = cbind.BindF[libuv.Async, libuv.AsyncCb](func(a *libuv.Async) { + a.Close(nil) + mutex.Lock() + currentCallbacks := callbacks + callbacks = nil + mutex.Unlock() + + for _, callback := range currentCallbacks { + callback(result) + } + }) + loop.Async(a, cb) + + // Execute fn immediately + fn(func(v T) { + result = v + resultReady.Store(true) + a.Send() + }) return func(chain func(T)) { mutex.Lock() if resultReady.Load() { mutex.Unlock() chain(result) - return - } - callbacks = append(callbacks, chain) - if !done.Swap(true) { - mutex.Unlock() - loop := Exec().L - - var a *libuv.Async - var cb libuv.AsyncCb - a, cb = cbind.BindF[libuv.Async, libuv.AsyncCb](func(a *libuv.Async) { - a.Close(nil) - mutex.Lock() - resultReady.Store(true) - currentCallbacks := callbacks - callbacks = nil - mutex.Unlock() - - for _, callback := range currentCallbacks { - callback(result) - } - }) - loop.Async(a, cb) - fn(func(v T) { - result = v - a.Send() - }) } else { + callbacks = append(callbacks, chain) mutex.Unlock() } } From 3f9e86c37a54ab70965d480ce0bb538b4c18222a Mon Sep 17 00:00:00 2001 From: Li Jie Date: Tue, 10 Sep 2024 11:49:42 +0800 Subject: [PATCH 6/7] x --- x/async/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x/async/README.md b/x/async/README.md index db79c8d5..7714af41 100644 --- a/x/async/README.md +++ b/x/async/README.md @@ -377,7 +377,7 @@ Introduce `async.Future[T]` type to represent an eventual completion (or failure ### Future chaining (asynchronous callbacks style) -`async.Future[T]` can be chained with `Then` method to add multiple callbacks to be executed when the operation is completed, it just runs once and calls every callbacks. Currently `Then` method can't be chained multiple times because Go doesn't support generics method (Need support `func (f Future[T]) Then[U any](f func(T) Future[U]) Future[U]`) and function overload currently, maybe implements in Go+. +`async.Future[T]` can be chained with `Then` method to add multiple callbacks to be executed when the operation is completed, it just runs once and calls every callbacks. Currently `Then` method can't be chained multiple times because Go doesn't support generics method (Need support `func (f Future[T]) Then[U any](f func(T) Future[U]) Future[U]`), maybe implements in Go+. ### Future waiting (synchronous style) From 7f4022120e479129e8be3bce966293026e7c7f6c Mon Sep 17 00:00:00 2001 From: Li Jie Date: Tue, 10 Sep 2024 14:38:46 +0800 Subject: [PATCH 7/7] fix deadlock --- x/async/async_go.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/x/async/async_go.go b/x/async/async_go.go index 7f905e80..4de78af3 100644 --- a/x/async/async_go.go +++ b/x/async/async_go.go @@ -37,8 +37,10 @@ func Async[T any](fn func(func(T))) Future[T] { }) return func(chain func(T)) { - wg.Wait() - chain(result) + go func() { + wg.Wait() + chain(result) + }() } }