diff --git a/x/socketio/README.md b/x/async/README.md similarity index 78% rename from x/socketio/README.md rename to x/async/README.md index da6a4c8e..7714af41 100644 --- a/x/socketio/README.md +++ b/x/async/README.md @@ -361,29 +361,83 @@ In some situations, you may want to get the first result of multiple async opera ## Design considerations in LLGo -- Don't introduce `async`/`await` keywords to compatible with Go compiler (just compiling) -- For performance reason don't implement async functions with goroutines -- Avoid implementing `Promise` by using `chan` to avoid blocking the thread, but it can be wrapped as a `chan` to make it compatible `select` statement +- Don't introduce `async`/`await` keywords to compatible with Go +- For performance and memory reasons don't implement async functions with goroutines, coroutines, or other mechanisms that require per-task stack allocation +- Avoid implementing async task by using `chan` that blocking the thread ## Design -Introduce `async.IO[T]` type to represent an asynchronous operation, `async.Future[T]` type to represent the result of an asynchronous operation. `async.IO[T]` can be `bind` to a function that accepts `T` as an argument to chain multiple asynchronous operations. `async.IO[T]` can be `await` to get the value of the asynchronous operation. +### `async.Future[T]` type + +Introduce `async.Future[T]` type to represent an eventual completion (or failure) of an asynchronous operation and its resulting value, similar to `Promise`/`Future` in other languages. Functions that return `async.Future[T]` are considered asynchronous functions. + +### Future creation + +`async.Future[T]` can be created by `async.Async[T]` function that takes a function that accepts a `resolve` function to produce a value of type `T`. + +### Future chaining (asynchronous callbacks style) + +`async.Future[T]` can be chained with `Then` method to add multiple callbacks to be executed when the operation is completed, it just runs once and calls every callbacks. Currently `Then` method can't be chained multiple times because Go doesn't support generics method (Need support `func (f Future[T]) Then[U any](f func(T) Future[U]) Future[U]`), maybe implements in Go+. + +### Future waiting (synchronous style) + +`async.Await[T]` function can be used to wait for the completion of a `Future[T]` and return the value produced by the operation. In LLGo, `async.Await[T]` is a blocking function that waits for the completion of the `Future[T]` and returns the value synchronously, it would be transformed to `Future.Then` callback in the frontend. + +### `async.Run[T]` function + +`async.Run[T]` function can be used to create an global asynchronous context and run async functions, and it would be hidden by the compiler in the future. + +Currently it will switch the callbacks to the goroutine that calls `async.Run[T]` function, this maybe changed in the future to reduce the overhead of switching goroutines and make it more parallel. + +### Prototype ```go package async -type Future[T any] func() T -type IO[T any] func() Future[T] +type Future[T any] interface { + Then(f func(T)) +} + +func Async[T any](f func(resolve func(T))) Future[T] + +func Await[T any](future Future[T]) T +``` + +### Some async functions + +```go +package async + + +func Race[T1 any](futures ...Future[T1]) Future[T1] + +func All[T1 any](futures ...Future[T1]) Future[[]T1] + +``` + +### Example + +```go +package main func main() { - io := func() Future[string] { - return func() string { - return "Hello, World!" - } + async.Run(func() { + hello := func() async.Future[string] { + return async.Async(func(resolve func(string)) { + resolve("Hello, World!") + }) } - future := io() - value := future() - println(value) + future := hello() + future.Then(func(value string) { + println("first callback:", value) + }) + future.Then(func(value string) { + println("second callback:", value) + }) + + println("first await:", async.Await(future)) + println("second await:", async.Await(future)) + }) } ``` diff --git a/x/async/TODO.md b/x/async/TODO.md new file mode 100644 index 00000000..6da0db45 --- /dev/null +++ b/x/async/TODO.md @@ -0,0 +1,17 @@ +讨论: + +1. Future 用 interface 还是闭包:性能应该差不多,如果没有其他方法要暴露,感觉也没有换成 interface 的必要,interface 多一个对象分配。先添加 Then 方法方便未来替换。 +2. 几个方法提供不同参数个数的版本还是用 tuple:如果编译器不支持可变泛型参数个数和特化,我倾向用 tuple 先简化实现,tuple 的开销应该也容易被编译器优化掉。多个方法让用户选择 Await2/Await3 这种也恶心。 +3. 是否 Cancellable,暂时不加进去,多一个 context,也不一定能快速稳定下来,可以后面根据实践再改。 +4. Executor 可能会变化,目前提供的 Run 是阻塞的,也可以把它做成异步。 +5. 尽量再隐藏一些辅助类型,比如 TupleN,可能之提供 tuple 的构造和返回多值。内部的 libuv 如果隐藏可能要暴露同等接口,先不动了 +6. 性能可能做个简单测试,但不是关键,只要别太差。未来可能会尽量减少 executor 的切换、尽量多并行 +7. 异常兼容性:目前没考虑,这个要在回调里处理可能困难,要么就在 await 上处理,可以往后放一下,毕竟 golang 主要是以 error 为主 +8. 可能先看一下如何在 go+里面集成,判断目前的设计实现是否合理 +9. 多封装一些库看看通用性和易用性,\_demo 里几个简单例子基本符合预期,还需要更多检验 + +TODO: + +[ ] 1. select 兼容 (可能把 Future 改为 interface 更合理?) +[x] 2. Future 多个 Await 只会被执行一次 +[x] 3. Future 添加 Then 方法,不推荐直接当作函数调用,方便未来切换 diff --git a/x/async/_demo/all/all.go b/x/async/_demo/all/all.go index 0f78e648..652bed28 100644 --- a/x/async/_demo/all/all.go +++ b/x/async/_demo/all/all.go @@ -3,6 +3,7 @@ package main import ( "fmt" "os" + "sync/atomic" "time" "github.com/goplus/llgo/x/async" @@ -32,64 +33,61 @@ func WriteFile(fileName string, content []byte) async.Future[error] { func sleep(i int, d time.Duration) async.Future[int] { return async.Async(func(resolve func(int)) { - timeout.Timeout(d)(func(async.Void) { + timeout.Timeout(d).Then(func(async.Void) { resolve(i) }) }) } func main() { - RunIO() - RunAllAndRace() - RunTimeout() - RunSocket() + async.Run(func(resolve func(async.Void)) { + RunIO() + RunAllAndRace() + RunTimeout() + RunMultipleCallbacksNodelay() + RunMultipleCallbacksDelay() + RunSocket() + }) } func RunIO() { println("RunIO with Await") // Hide `resolve` in Go+ - async.Run(async.Async(func(resolve func(async.Void)) { - println("read file") - defer resolve(async.Void{}) - content, err := async.Await(ReadFile("all.go")).Get() + + println("read file") + content, err := async.Await(ReadFile("all.go")).Get() + if err != nil { + fmt.Printf("read err: %v\n", err) + return + } + fmt.Printf("read content: %s\n", content) + err = async.Await(WriteFile("2.out", content)) + if err != nil { + fmt.Printf("write err: %v\n", err) + return + } + fmt.Printf("write done\n") + + // Translated Await to BindIO in Go+: + println("RunIO with BindIO") + + ReadFile("all.go").Then(func(v tuple.Tuple2[[]byte, error]) { + content, err := v.Get() if err != nil { fmt.Printf("read err: %v\n", err) return } fmt.Printf("read content: %s\n", content) - err = async.Await(WriteFile("2.out", content)) - if err != nil { - fmt.Printf("write err: %v\n", err) - return - } - fmt.Printf("write done\n") - })) - - // Translated Await to BindIO in Go+: - println("RunIO with BindIO") - - async.Run(async.Async(func(resolve func(async.Void)) { - ReadFile("all.go")(func(v tuple.Tuple2[[]byte, error]) { - content, err := v.Get() + WriteFile("2.out", content).Then(func(v error) { + err = v if err != nil { - fmt.Printf("read err: %v\n", err) - resolve(async.Void{}) + fmt.Printf("write err: %v\n", err) return } - fmt.Printf("read content: %s\n", content) - WriteFile("2.out", content)(func(v error) { - err = v - if err != nil { - fmt.Printf("write err: %v\n", err) - resolve(async.Void{}) - return - } - println("write done") - resolve(async.Void{}) - }) + println("write done") }) - })) + }) } func RunAllAndRace() { @@ -99,92 +97,139 @@ func RunAllAndRace() { println("Run All with Await") - async.Run(async.Async(func(resolve func(async.Void)) { - async.All(sleep(1, ms200), sleep(2, ms100), sleep(3, ms300))(func(v []int) { - fmt.Printf("All: %v\n", v) - resolve(async.Void{}) - }) - })) + async.All(sleep(1, ms200), sleep(2, ms100), sleep(3, ms300)).Then(func(v []int) { + fmt.Printf("All: %v\n", v) + }) println("Run Race with Await") - async.Run(async.Async(func(resolve func(async.Void)) { - first := async.Race(sleep(1, ms200), sleep(2, ms100), sleep(3, ms300)) - v := async.Await(first) - fmt.Printf("Race: %v\n", v) - resolve(async.Void{}) - })) + first := async.Race(sleep(1, ms200), sleep(2, ms100), sleep(3, ms300)) + v := async.Await(first) + fmt.Printf("Race: %v\n", v) // Translated to in Go+: println("Run All with BindIO") - async.Run(async.Async(func(resolve func(async.Void)) { - async.All(sleep(1, ms200), sleep(2, ms100), sleep(3, ms300))(func(v []int) { - fmt.Printf("All: %v\n", v) - resolve(async.Void{}) - }) - })) + async.All(sleep(1, ms200), sleep(2, ms100), sleep(3, ms300)).Then(func(v []int) { + fmt.Printf("All: %v\n", v) + }) println("Run Race with BindIO") - async.Run(async.Async(func(resolve func(async.Void)) { - async.Race(sleep(1, ms200), sleep(2, ms100), sleep(3, ms300))(func(v int) { - fmt.Printf("Race: %v\n", v) - resolve(async.Void{}) - }) - })) + async.Race(sleep(1, ms200), sleep(2, ms100), sleep(3, ms300)).Then(func(v int) { + fmt.Printf("Race: %v\n", v) + }) + } func RunTimeout() { println("Run Timeout with Await") - async.Run(async.Async(func(resolve func(async.Void)) { - fmt.Printf("Start 100 ms timeout\n") - async.Await(timeout.Timeout(100 * time.Millisecond)) - fmt.Printf("timeout\n") - resolve(async.Void{}) - })) + fmt.Printf("Start 100 ms timeout\n") + async.Await(timeout.Timeout(100 * time.Millisecond)) + fmt.Printf("timeout\n") // Translated to in Go+: println("Run Timeout with BindIO") - async.Run(async.Async(func(resolve func(async.Void)) { - fmt.Printf("Start 100 ms timeout\n") - timeout.Timeout(100 * time.Millisecond)(func(async.Void) { - fmt.Printf("timeout\n") - resolve(async.Void{}) + fmt.Printf("Start 100 ms timeout\n") + timeout.Timeout(100 * time.Millisecond).Then(func(async.Void) { + fmt.Printf("timeout\n") + }) +} + +func RunMultipleCallbacksNodelay() { + println("Run Multiple Callbacks") + + runCnt := atomic.Int32{} + + nodelay := async.Async(func(resolve func(async.Void)) { + println("nodelay") + runCnt.Add(1) + }) + + cbCnt := atomic.Int32{} + cb := func() { + if cbCnt.Add(1) == 2 { + if runCnt.Load() != 1 { + panic("runCnt != 1, got: " + fmt.Sprint(runCnt.Load())) + } else { + println("runCnt == 1") + } + } + } + nodelay.Then(func(async.Void) { + println("nodelay done") + cb() + }) + + nodelay.Then(func(async.Void) { + println("nodelay done again") + cb() + }) +} + +func RunMultipleCallbacksDelay() { + println("Run Multiple Callbacks") + + runCnt := atomic.Int32{} + + delay := async.Async(func(resolve func(async.Void)) { + timeout.Timeout(100 * time.Millisecond).Then(func(async.Void) { + println("delay") + runCnt.Add(1) }) - })) + }) + + cbCnt := atomic.Int32{} + cb := func() { + if cbCnt.Add(1) == 2 { + if runCnt.Load() != 1 { + panic("runCnt != 1, got: " + fmt.Sprint(runCnt.Load())) + } else { + println("runCnt == 1") + } + } + } + + delay.Then(func(async.Void) { + println("delay done") + cb() + }) + + delay.Then(func(async.Void) { + println("delay done again") + cb() + }) } func RunSocket() { println("Run Socket") - async.Run(async.Async(func(resolve func(async.Void)) { - println("RunServer") + println("RunServer") - RunServer()(func(async.Void) { - println("RunServer done") - resolve(async.Void{}) + RunServer().Then(func(async.Void) { + println("RunServer done") + }) + + println("RunClient") + + timeout.Timeout(100 * time.Millisecond).Then(func(async.Void) { + RunClient("Bob").Then(func(async.Void) { + println("RunClient done") }) - - println("RunClient") - - timeout.Timeout(100 * time.Millisecond)(func(async.Void) { - RunClient()(func(async.Void) { - println("RunClient done") - resolve(async.Void{}) - }) + RunClient("Uncle").Then(func(async.Void) { + println("RunClient done") }) - })) + }) } -func RunClient() async.Future[async.Void] { +func RunClient(name string) async.Future[async.Void] { return async.Async(func(resolve func(async.Void)) { addr := "127.0.0.1:3927" - socketio.Connect("tcp", addr)(func(v tuple.Tuple2[*socketio.Conn, error]) { + socketio.Connect("tcp", addr).Then(func(v tuple.Tuple2[*socketio.Conn, error]) { client, err := v.Get() println("Connected", client, err) if err != nil { @@ -194,18 +239,18 @@ func RunClient() async.Future[async.Void] { var loop func(client *socketio.Conn) loop = func(client *socketio.Conn) { counter++ - data := fmt.Sprintf("Hello %d", counter) - client.Write([]byte(data))(func(err error) { + data := fmt.Sprintf("Hello from %s %d", name, counter) + client.Write([]byte(data)).Then(func(err error) { if err != nil { panic(err) } - client.Read()(func(v tuple.Tuple2[[]byte, error]) { + client.Read().Then(func(v tuple.Tuple2[[]byte, error]) { data, err := v.Get() if err != nil { panic(err) } println("Read from server:", string(data)) - timeout.Timeout(1 * time.Second)(func(async.Void) { + timeout.Timeout(1 * time.Second).Then(func(async.Void) { loop(client) }) }) @@ -222,13 +267,13 @@ func RunServer() async.Future[async.Void] { println("Client connected", client, err) var loop func(client *socketio.Conn) loop = func(client *socketio.Conn) { - client.Read()(func(v tuple.Tuple2[[]byte, error]) { + client.Read().Then(func(v tuple.Tuple2[[]byte, error]) { data, err := v.Get() if err != nil { println("Read error", err) } else { println("Read from client:", string(data)) - client.Write(data)(func(err error) { + client.Write(data).Then(func(err error) { if err != nil { println("Write error", err) } else { diff --git a/x/async/async.go b/x/async/async.go index fcd9a2e9..6b93f0e7 100644 --- a/x/async/async.go +++ b/x/async/async.go @@ -24,6 +24,10 @@ type Void = [0]byte type Future[T any] func(func(T)) +func (f Future[T]) Then(cb func(T)) { + f(cb) +} + // Just for pure LLGo/Go, transpile to callback in Go+ func Await[T1 any](future Future[T1]) T1 { return Run(future) diff --git a/x/async/async_go.go b/x/async/async_go.go index c775b7e8..4de78af3 100644 --- a/x/async/async_go.go +++ b/x/async/async_go.go @@ -22,8 +22,25 @@ package async import "sync" func Async[T any](fn func(func(T))) Future[T] { + var once sync.Once + var result T + var wg sync.WaitGroup + wg.Add(1) + + once.Do(func() { + go func() { + fn(func(v T) { + result = v + wg.Done() + }) + }() + }) + return func(chain func(T)) { - go fn(chain) + go func() { + wg.Wait() + chain(result) + }() } } @@ -34,7 +51,7 @@ func Race[T1 any](futures ...Future[T1]) Future[T1] { ch := make(chan T1) for _, future := range futures { future := future - future(func(v T1) { + future.Then(func(v T1) { defer func() { // Avoid panic when the channel is closed. _ = recover() @@ -56,7 +73,7 @@ func All[T1 any](futures ...Future[T1]) Future[[]T1] { wg.Add(n) for i, future := range futures { i := i - future(func(v T1) { + future.Then(func(v T1) { results[i] = v wg.Done() }) diff --git a/x/async/async_llgo.go b/x/async/async_llgo.go index d301b20c..b6bad936 100644 --- a/x/async/async_llgo.go +++ b/x/async/async_llgo.go @@ -20,6 +20,7 @@ package async import ( + "sync" "sync/atomic" "github.com/goplus/llgo/c/libuv" @@ -27,23 +28,44 @@ import ( ) // Currently Async run chain a future that call chain in the goroutine running `async.Run`. -// TODO(lijie): It would better to switch when needed. func Async[T any](fn func(func(T))) Future[T] { - return func(chain func(T)) { - loop := Exec().L + var result T + var resultReady atomic.Bool + var callbacks []func(T) + var mutex sync.Mutex + loop := Exec().L - var result T - var a *libuv.Async - var cb libuv.AsyncCb - a, cb = cbind.BindF[libuv.Async, libuv.AsyncCb](func(a *libuv.Async) { - a.Close(nil) + var a *libuv.Async + var cb libuv.AsyncCb + a, cb = cbind.BindF[libuv.Async, libuv.AsyncCb](func(a *libuv.Async) { + a.Close(nil) + mutex.Lock() + currentCallbacks := callbacks + callbacks = nil + mutex.Unlock() + + for _, callback := range currentCallbacks { + callback(result) + } + }) + loop.Async(a, cb) + + // Execute fn immediately + fn(func(v T) { + result = v + resultReady.Store(true) + a.Send() + }) + + return func(chain func(T)) { + mutex.Lock() + if resultReady.Load() { + mutex.Unlock() chain(result) - }) - loop.Async(a, cb) - fn(func(v T) { - result = v - a.Send() - }) + } else { + callbacks = append(callbacks, chain) + mutex.Unlock() + } } } @@ -53,7 +75,7 @@ func Race[T1 any](futures ...Future[T1]) Future[T1] { return Async(func(resolve func(T1)) { done := atomic.Bool{} for _, future := range futures { - future(func(v T1) { + future.Then(func(v T1) { if !done.Swap(true) { // Just resolve the first one. resolve(v) @@ -70,7 +92,7 @@ func All[T1 any](futures ...Future[T1]) Future[[]T1] { var done uint32 for i, future := range futures { i := i - future(func(v T1) { + future.Then(func(v T1) { results[i] = v if atomic.AddUint32(&done, 1) == uint32(n) { // All done. diff --git a/x/async/executor_go.go b/x/async/executor_go.go index 60962638..38667aa7 100644 --- a/x/async/executor_go.go +++ b/x/async/executor_go.go @@ -22,7 +22,7 @@ package async func Run[T any](future Future[T]) T { ch := make(chan T) go func() { - future(func(v T) { + future.Then(func(v T) { ch <- v }) }() diff --git a/x/async/executor_llgo.go b/x/async/executor_llgo.go index e1e03a6a..af28295f 100644 --- a/x/async/executor_llgo.go +++ b/x/async/executor_llgo.go @@ -59,7 +59,7 @@ func Run[T any](future Future[T]) T { exec := &Executor{loop} oldExec := setExec(exec) var ret T - future(func(v T) { + future.Then(func(v T) { ret = v }) exec.Run() diff --git a/x/socketio/socketio_llgo.go b/x/socketio/socketio_llgo.go index e6b799a5..970e70c6 100644 --- a/x/socketio/socketio_llgo.go +++ b/x/socketio/socketio_llgo.go @@ -104,7 +104,7 @@ func Listen(protocol, bindAddr string, listenCb func(client *Conn, err error)) { listenCb(nil, err) return } - parseAddr(bindAddr)(func(v tuple.Tuple2[*net.SockAddr, error]) { + parseAddr(bindAddr).Then(func(v tuple.Tuple2[*net.SockAddr, error]) { addr, err := v.Get() if err != nil { listenCb(nil, err) @@ -167,7 +167,7 @@ func (l *Listener) accept() (client *Conn, err error) { func Connect(network, addr string) async.Future[tuple.Tuple2[*Conn, error]] { return async.Async(func(resolve func(tuple.Tuple2[*Conn, error])) { - parseAddr(addr)(func(v tuple.Tuple2[*net.SockAddr, error]) { + parseAddr(addr).Then(func(v tuple.Tuple2[*net.SockAddr, error]) { addr, err := v.Get() if err != nil { resolve(tuple.T2[*Conn, error]((*Conn)(nil), err))