diff --git a/x/async/README.md b/x/async/README.md index 058da95d..db79c8d5 100644 --- a/x/async/README.md +++ b/x/async/README.md @@ -361,56 +361,71 @@ In some situations, you may want to get the first result of multiple async opera ## Design considerations in LLGo -- Don't introduce `async`/`await` keywords to compatible with Go compiler (just compiling) -- For performance and memory reasons don't implement async functions with goroutines -- Avoid implementing `Promise` by using `chan` to avoid blocking the thread, but it can be wrapped as a `chan` to make it compatible `select` statement +- Don't introduce `async`/`await` keywords to compatible with Go +- For performance and memory reasons don't implement async functions with goroutines, coroutines, or other mechanisms that require per-task stack allocation +- Avoid implementing async task by using `chan` that blocking the thread ## Design ### `async.Future[T]` type -Introduce `async.Future[T]` type to represent an asynchronous operation that will produce a value of type `T`, similar to `Promise`/`Future` in other languages. +Introduce `async.Future[T]` type to represent an eventual completion (or failure) of an asynchronous operation and its resulting value, similar to `Promise`/`Future` in other languages. Functions that return `async.Future[T]` are considered asynchronous functions. ### Future creation -`async.Future[T]` can create by `async.Async[T]` function that takes a function that accepts a `resolve` function to produce a value of type `T`. +`async.Future[T]` can be created by `async.Async[T]` function that takes a function that accepts a `resolve` function to produce a value of type `T`. ### Future chaining (asynchronous callbacks style) -`async.Future[T]` can be chained with `Then` method to add multiple callbacks to be executed when the operation is completed, it just runs once and calls every callbacks. `Then` method can't be chained multiple times because Go doesn't support generics method currently (Need support `func (f Future[T]) Then[U any](f func(T) Future[U]) Future[U]`). +`async.Future[T]` can be chained with `Then` method to add multiple callbacks to be executed when the operation is completed, it just runs once and calls every callbacks. Currently `Then` method can't be chained multiple times because Go doesn't support generics method (Need support `func (f Future[T]) Then[U any](f func(T) Future[U]) Future[U]`) and function overload currently, maybe implements in Go+. ### Future waiting (synchronous style) -`async.Await[T]` function can be used to wait for the completion of a `Future[T]` and return the value produced by the operation. +`async.Await[T]` function can be used to wait for the completion of a `Future[T]` and return the value produced by the operation. In LLGo, `async.Await[T]` is a blocking function that waits for the completion of the `Future[T]` and returns the value synchronously, it would be transformed to `Future.Then` callback in the frontend. ### `async.Run[T]` function -`async.Run[T]` function can be used to run the asynchronous operation and get the result of the operation. Currently it will switch the callbacks to the goroutine that calls `async.Run[T]` function, this maybe changed in the future to reduce the overhead of switching goroutines and make it more parallel. +`async.Run[T]` function can be used to create an global asynchronous context and run async functions, and it would be hidden by the compiler in the future. -### `async.Future[T]` prototype +Currently it will switch the callbacks to the goroutine that calls `async.Run[T]` function, this maybe changed in the future to reduce the overhead of switching goroutines and make it more parallel. + +### Prototype ```go package async -type Future[T any] func(func(T)) - -func (f Future[T]) Then(f func(T)) +type Future[T any] interface { + Then(f func(T)) +} func Async[T any](f func(resolve func(T))) Future[T] func Await[T any](future Future[T]) T ``` +### Some async functions + +```go +package async + + +func Race[T1 any](futures ...Future[T1]) Future[T1] + +func All[T1 any](futures ...Future[T1]) Future[[]T1] + +``` + ### Example ```go package main func main() { - hello := func() Future[string] { - return func(resolve func(string)) { + async.Run(func() { + hello := func() async.Future[string] { + return async.Async(func(resolve func(string)) { resolve("Hello, World!") - } + }) } future := hello() @@ -421,7 +436,8 @@ func main() { println("second callback:", value) }) - println("first await:", Await(future)) - println("second await:", Await(future)) + println("first await:", async.Await(future)) + println("second await:", async.Await(future)) + }) } ``` diff --git a/x/async/_demo/all/all.go b/x/async/_demo/all/all.go index 80ce489f..652bed28 100644 --- a/x/async/_demo/all/all.go +++ b/x/async/_demo/all/all.go @@ -40,59 +40,54 @@ func sleep(i int, d time.Duration) async.Future[int] { } func main() { - RunIO() - RunAllAndRace() - RunTimeout() - RunMultipleCallbacksNodelay() - RunMultipleCallbacksDelay() - RunSocket() + async.Run(func(resolve func(async.Void)) { + RunIO() + RunAllAndRace() + RunTimeout() + RunMultipleCallbacksNodelay() + RunMultipleCallbacksDelay() + RunSocket() + }) } func RunIO() { println("RunIO with Await") // Hide `resolve` in Go+ - async.Run(async.Async(func(resolve func(async.Void)) { - println("read file") - defer resolve(async.Void{}) - content, err := async.Await(ReadFile("all.go")).Get() + + println("read file") + content, err := async.Await(ReadFile("all.go")).Get() + if err != nil { + fmt.Printf("read err: %v\n", err) + return + } + fmt.Printf("read content: %s\n", content) + err = async.Await(WriteFile("2.out", content)) + if err != nil { + fmt.Printf("write err: %v\n", err) + return + } + fmt.Printf("write done\n") + + // Translated Await to BindIO in Go+: + println("RunIO with BindIO") + + ReadFile("all.go").Then(func(v tuple.Tuple2[[]byte, error]) { + content, err := v.Get() if err != nil { fmt.Printf("read err: %v\n", err) return } fmt.Printf("read content: %s\n", content) - err = async.Await(WriteFile("2.out", content)) - if err != nil { - fmt.Printf("write err: %v\n", err) - return - } - fmt.Printf("write done\n") - })) - - // Translated Await to BindIO in Go+: - println("RunIO with BindIO") - - async.Run(async.Async(func(resolve func(async.Void)) { - ReadFile("all.go").Then(func(v tuple.Tuple2[[]byte, error]) { - content, err := v.Get() + WriteFile("2.out", content).Then(func(v error) { + err = v if err != nil { - fmt.Printf("read err: %v\n", err) - resolve(async.Void{}) + fmt.Printf("write err: %v\n", err) return } - fmt.Printf("read content: %s\n", content) - WriteFile("2.out", content).Then(func(v error) { - err = v - if err != nil { - fmt.Printf("write err: %v\n", err) - resolve(async.Void{}) - return - } - println("write done") - resolve(async.Void{}) - }) + println("write done") }) - })) + }) } func RunAllAndRace() { @@ -102,160 +97,133 @@ func RunAllAndRace() { println("Run All with Await") - async.Run(async.Async(func(resolve func(async.Void)) { - async.All(sleep(1, ms200), sleep(2, ms100), sleep(3, ms300)).Then(func(v []int) { - fmt.Printf("All: %v\n", v) - resolve(async.Void{}) - }) - })) + async.All(sleep(1, ms200), sleep(2, ms100), sleep(3, ms300)).Then(func(v []int) { + fmt.Printf("All: %v\n", v) + }) println("Run Race with Await") - async.Run(async.Async(func(resolve func(async.Void)) { - first := async.Race(sleep(1, ms200), sleep(2, ms100), sleep(3, ms300)) - v := async.Await(first) - fmt.Printf("Race: %v\n", v) - resolve(async.Void{}) - })) + first := async.Race(sleep(1, ms200), sleep(2, ms100), sleep(3, ms300)) + v := async.Await(first) + fmt.Printf("Race: %v\n", v) // Translated to in Go+: println("Run All with BindIO") - async.Run(async.Async(func(resolve func(async.Void)) { - async.All(sleep(1, ms200), sleep(2, ms100), sleep(3, ms300)).Then(func(v []int) { - fmt.Printf("All: %v\n", v) - resolve(async.Void{}) - }) - })) + async.All(sleep(1, ms200), sleep(2, ms100), sleep(3, ms300)).Then(func(v []int) { + fmt.Printf("All: %v\n", v) + }) println("Run Race with BindIO") - async.Run(async.Async(func(resolve func(async.Void)) { - async.Race(sleep(1, ms200), sleep(2, ms100), sleep(3, ms300)).Then(func(v int) { - fmt.Printf("Race: %v\n", v) - resolve(async.Void{}) - }) - })) + async.Race(sleep(1, ms200), sleep(2, ms100), sleep(3, ms300)).Then(func(v int) { + fmt.Printf("Race: %v\n", v) + }) + } func RunTimeout() { println("Run Timeout with Await") - async.Run(async.Async(func(resolve func(async.Void)) { - fmt.Printf("Start 100 ms timeout\n") - async.Await(timeout.Timeout(100 * time.Millisecond)) - fmt.Printf("timeout\n") - resolve(async.Void{}) - })) + fmt.Printf("Start 100 ms timeout\n") + async.Await(timeout.Timeout(100 * time.Millisecond)) + fmt.Printf("timeout\n") // Translated to in Go+: println("Run Timeout with BindIO") - async.Run(async.Async(func(resolve func(async.Void)) { - fmt.Printf("Start 100 ms timeout\n") - timeout.Timeout(100 * time.Millisecond).Then(func(async.Void) { - fmt.Printf("timeout\n") - resolve(async.Void{}) - }) - })) + fmt.Printf("Start 100 ms timeout\n") + timeout.Timeout(100 * time.Millisecond).Then(func(async.Void) { + fmt.Printf("timeout\n") + }) } func RunMultipleCallbacksNodelay() { println("Run Multiple Callbacks") runCnt := atomic.Int32{} - async.Run(async.Async(func(resolve func(async.Void)) { - nodelay := async.Async(func(resolve func(async.Void)) { - println("nodelay") - runCnt.Add(1) - resolve(async.Void{}) - }) - cbCnt := atomic.Int32{} - cb := func() { - if cbCnt.Add(1) == 2 { - resolve(async.Void{}) + nodelay := async.Async(func(resolve func(async.Void)) { + println("nodelay") + runCnt.Add(1) + }) + + cbCnt := atomic.Int32{} + cb := func() { + if cbCnt.Add(1) == 2 { + if runCnt.Load() != 1 { + panic("runCnt != 1, got: " + fmt.Sprint(runCnt.Load())) + } else { + println("runCnt == 1") } } - - nodelay.Then(func(async.Void) { - println("nodelay done") - cb() - }) - - nodelay.Then(func(async.Void) { - println("nodelay done again") - cb() - }) - })) - - if runCnt.Load() != 1 { - panic("runCnt != 1") } + nodelay.Then(func(async.Void) { + println("nodelay done") + cb() + }) + + nodelay.Then(func(async.Void) { + println("nodelay done again") + cb() + }) } func RunMultipleCallbacksDelay() { println("Run Multiple Callbacks") runCnt := atomic.Int32{} - async.Run(async.Async(func(resolve func(async.Void)) { - delay := async.Async(func(resolve func(async.Void)) { - timeout.Timeout(100 * time.Millisecond).Then(func(async.Void) { - println("delay") - runCnt.Add(1) - resolve(async.Void{}) - }) - }) - cbCnt := atomic.Int32{} - cb := func() { - if cbCnt.Add(1) == 2 { - resolve(async.Void{}) + delay := async.Async(func(resolve func(async.Void)) { + timeout.Timeout(100 * time.Millisecond).Then(func(async.Void) { + println("delay") + runCnt.Add(1) + }) + }) + + cbCnt := atomic.Int32{} + cb := func() { + if cbCnt.Add(1) == 2 { + if runCnt.Load() != 1 { + panic("runCnt != 1, got: " + fmt.Sprint(runCnt.Load())) + } else { + println("runCnt == 1") } } - - delay.Then(func(async.Void) { - println("delay done") - cb() - }) - - delay.Then(func(async.Void) { - println("delay done again") - cb() - }) - })) - - if runCnt.Load() != 1 { - panic("runCnt != 1") } + + delay.Then(func(async.Void) { + println("delay done") + cb() + }) + + delay.Then(func(async.Void) { + println("delay done again") + cb() + }) } func RunSocket() { println("Run Socket") - async.Run(async.Async(func(resolve func(async.Void)) { - println("RunServer") + println("RunServer") - RunServer().Then(func(async.Void) { - println("RunServer done") - resolve(async.Void{}) + RunServer().Then(func(async.Void) { + println("RunServer done") + }) + + println("RunClient") + + timeout.Timeout(100 * time.Millisecond).Then(func(async.Void) { + RunClient("Bob").Then(func(async.Void) { + println("RunClient done") }) - - println("RunClient") - - timeout.Timeout(100 * time.Millisecond).Then(func(async.Void) { - RunClient("Bob").Then(func(async.Void) { - println("RunClient done") - resolve(async.Void{}) - }) - RunClient("Uncle").Then(func(async.Void) { - println("RunClient done") - resolve(async.Void{}) - }) + RunClient("Uncle").Then(func(async.Void) { + println("RunClient done") }) - })) + }) } func RunClient(name string) async.Future[async.Void] { diff --git a/x/async/async_go.go b/x/async/async_go.go index 42addf15..7f905e80 100644 --- a/x/async/async_go.go +++ b/x/async/async_go.go @@ -27,20 +27,18 @@ func Async[T any](fn func(func(T))) Future[T] { var wg sync.WaitGroup wg.Add(1) - return func(chain func(T)) { - once.Do(func() { - go func() { - fn(func(v T) { - result = v - wg.Done() - }) - }() - }) - + once.Do(func() { go func() { - wg.Wait() - chain(result) + fn(func(v T) { + result = v + wg.Done() + }) }() + }) + + return func(chain func(T)) { + wg.Wait() + chain(result) } } diff --git a/x/async/async_llgo.go b/x/async/async_llgo.go index 8f297db5..b6bad936 100644 --- a/x/async/async_llgo.go +++ b/x/async/async_llgo.go @@ -28,66 +28,42 @@ import ( ) // Currently Async run chain a future that call chain in the goroutine running `async.Run`. -// Basic implementation: -// func Async[T any](fn func(func(T))) Future[T] { -// return func(chain func(T)) { -// loop := Exec().L - -// var result T -// var a *libuv.Async -// var cb libuv.AsyncCb -// a, cb = cbind.BindF[libuv.Async, libuv.AsyncCb](func(a *libuv.Async) { -// a.Close(nil) -// chain(result) -// }) -// loop.Async(a, cb) -// fn(func(v T) { -// result = v -// a.Send() -// }) -// } -// } -// -// Better implementation to support multiple callbacks: func Async[T any](fn func(func(T))) Future[T] { var result T - var done atomic.Bool var resultReady atomic.Bool var callbacks []func(T) var mutex sync.Mutex + loop := Exec().L + + var a *libuv.Async + var cb libuv.AsyncCb + a, cb = cbind.BindF[libuv.Async, libuv.AsyncCb](func(a *libuv.Async) { + a.Close(nil) + mutex.Lock() + currentCallbacks := callbacks + callbacks = nil + mutex.Unlock() + + for _, callback := range currentCallbacks { + callback(result) + } + }) + loop.Async(a, cb) + + // Execute fn immediately + fn(func(v T) { + result = v + resultReady.Store(true) + a.Send() + }) return func(chain func(T)) { mutex.Lock() if resultReady.Load() { mutex.Unlock() chain(result) - return - } - callbacks = append(callbacks, chain) - if !done.Swap(true) { - mutex.Unlock() - loop := Exec().L - - var a *libuv.Async - var cb libuv.AsyncCb - a, cb = cbind.BindF[libuv.Async, libuv.AsyncCb](func(a *libuv.Async) { - a.Close(nil) - mutex.Lock() - resultReady.Store(true) - currentCallbacks := callbacks - callbacks = nil - mutex.Unlock() - - for _, callback := range currentCallbacks { - callback(result) - } - }) - loop.Async(a, cb) - fn(func(v T) { - result = v - a.Send() - }) } else { + callbacks = append(callbacks, chain) mutex.Unlock() } }