async.Run as global context, async operations run immediately

This commit is contained in:
Li Jie
2024-09-10 11:43:44 +08:00
parent 44c4488fcc
commit 12f460e376
4 changed files with 178 additions and 220 deletions

View File

@@ -361,56 +361,71 @@ In some situations, you may want to get the first result of multiple async opera
## Design considerations in LLGo ## Design considerations in LLGo
- Don't introduce `async`/`await` keywords to compatible with Go compiler (just compiling) - Don't introduce `async`/`await` keywords to compatible with Go
- For performance and memory reasons don't implement async functions with goroutines - For performance and memory reasons don't implement async functions with goroutines, coroutines, or other mechanisms that require per-task stack allocation
- Avoid implementing `Promise` by using `chan` to avoid blocking the thread, but it can be wrapped as a `chan` to make it compatible `select` statement - Avoid implementing async task by using `chan` that blocking the thread
## Design ## Design
### `async.Future[T]` type ### `async.Future[T]` type
Introduce `async.Future[T]` type to represent an asynchronous operation that will produce a value of type `T`, similar to `Promise`/`Future` in other languages. Introduce `async.Future[T]` type to represent an eventual completion (or failure) of an asynchronous operation and its resulting value, similar to `Promise`/`Future` in other languages. Functions that return `async.Future[T]` are considered asynchronous functions.
### Future creation ### Future creation
`async.Future[T]` can create by `async.Async[T]` function that takes a function that accepts a `resolve` function to produce a value of type `T`. `async.Future[T]` can be created by `async.Async[T]` function that takes a function that accepts a `resolve` function to produce a value of type `T`.
### Future chaining (asynchronous callbacks style) ### Future chaining (asynchronous callbacks style)
`async.Future[T]` can be chained with `Then` method to add multiple callbacks to be executed when the operation is completed, it just runs once and calls every callbacks. `Then` method can't be chained multiple times because Go doesn't support generics method currently (Need support `func (f Future[T]) Then[U any](f func(T) Future[U]) Future[U]`). `async.Future[T]` can be chained with `Then` method to add multiple callbacks to be executed when the operation is completed, it just runs once and calls every callbacks. Currently `Then` method can't be chained multiple times because Go doesn't support generics method (Need support `func (f Future[T]) Then[U any](f func(T) Future[U]) Future[U]`) and function overload currently, maybe implements in Go+.
### Future waiting (synchronous style) ### Future waiting (synchronous style)
`async.Await[T]` function can be used to wait for the completion of a `Future[T]` and return the value produced by the operation. `async.Await[T]` function can be used to wait for the completion of a `Future[T]` and return the value produced by the operation. In LLGo, `async.Await[T]` is a blocking function that waits for the completion of the `Future[T]` and returns the value synchronously, it would be transformed to `Future.Then` callback in the frontend.
### `async.Run[T]` function ### `async.Run[T]` function
`async.Run[T]` function can be used to run the asynchronous operation and get the result of the operation. Currently it will switch the callbacks to the goroutine that calls `async.Run[T]` function, this maybe changed in the future to reduce the overhead of switching goroutines and make it more parallel. `async.Run[T]` function can be used to create an global asynchronous context and run async functions, and it would be hidden by the compiler in the future.
### `async.Future[T]` prototype Currently it will switch the callbacks to the goroutine that calls `async.Run[T]` function, this maybe changed in the future to reduce the overhead of switching goroutines and make it more parallel.
### Prototype
```go ```go
package async package async
type Future[T any] func(func(T)) type Future[T any] interface {
Then(f func(T))
func (f Future[T]) Then(f func(T)) }
func Async[T any](f func(resolve func(T))) Future[T] func Async[T any](f func(resolve func(T))) Future[T]
func Await[T any](future Future[T]) T func Await[T any](future Future[T]) T
``` ```
### Some async functions
```go
package async
func Race[T1 any](futures ...Future[T1]) Future[T1]
func All[T1 any](futures ...Future[T1]) Future[[]T1]
```
### Example ### Example
```go ```go
package main package main
func main() { func main() {
hello := func() Future[string] { async.Run(func() {
return func(resolve func(string)) { hello := func() async.Future[string] {
return async.Async(func(resolve func(string)) {
resolve("Hello, World!") resolve("Hello, World!")
} })
} }
future := hello() future := hello()
@@ -421,7 +436,8 @@ func main() {
println("second callback:", value) println("second callback:", value)
}) })
println("first await:", Await(future)) println("first await:", async.Await(future))
println("second await:", Await(future)) println("second await:", async.Await(future))
})
} }
``` ```

View File

@@ -40,59 +40,54 @@ func sleep(i int, d time.Duration) async.Future[int] {
} }
func main() { func main() {
RunIO() async.Run(func(resolve func(async.Void)) {
RunAllAndRace() RunIO()
RunTimeout() RunAllAndRace()
RunMultipleCallbacksNodelay() RunTimeout()
RunMultipleCallbacksDelay() RunMultipleCallbacksNodelay()
RunSocket() RunMultipleCallbacksDelay()
RunSocket()
})
} }
func RunIO() { func RunIO() {
println("RunIO with Await") println("RunIO with Await")
// Hide `resolve` in Go+ // Hide `resolve` in Go+
async.Run(async.Async(func(resolve func(async.Void)) {
println("read file") println("read file")
defer resolve(async.Void{}) content, err := async.Await(ReadFile("all.go")).Get()
content, err := async.Await(ReadFile("all.go")).Get() if err != nil {
fmt.Printf("read err: %v\n", err)
return
}
fmt.Printf("read content: %s\n", content)
err = async.Await(WriteFile("2.out", content))
if err != nil {
fmt.Printf("write err: %v\n", err)
return
}
fmt.Printf("write done\n")
// Translated Await to BindIO in Go+:
println("RunIO with BindIO")
ReadFile("all.go").Then(func(v tuple.Tuple2[[]byte, error]) {
content, err := v.Get()
if err != nil { if err != nil {
fmt.Printf("read err: %v\n", err) fmt.Printf("read err: %v\n", err)
return return
} }
fmt.Printf("read content: %s\n", content) fmt.Printf("read content: %s\n", content)
err = async.Await(WriteFile("2.out", content)) WriteFile("2.out", content).Then(func(v error) {
if err != nil { err = v
fmt.Printf("write err: %v\n", err)
return
}
fmt.Printf("write done\n")
}))
// Translated Await to BindIO in Go+:
println("RunIO with BindIO")
async.Run(async.Async(func(resolve func(async.Void)) {
ReadFile("all.go").Then(func(v tuple.Tuple2[[]byte, error]) {
content, err := v.Get()
if err != nil { if err != nil {
fmt.Printf("read err: %v\n", err) fmt.Printf("write err: %v\n", err)
resolve(async.Void{})
return return
} }
fmt.Printf("read content: %s\n", content) println("write done")
WriteFile("2.out", content).Then(func(v error) {
err = v
if err != nil {
fmt.Printf("write err: %v\n", err)
resolve(async.Void{})
return
}
println("write done")
resolve(async.Void{})
})
}) })
})) })
} }
func RunAllAndRace() { func RunAllAndRace() {
@@ -102,160 +97,133 @@ func RunAllAndRace() {
println("Run All with Await") println("Run All with Await")
async.Run(async.Async(func(resolve func(async.Void)) { async.All(sleep(1, ms200), sleep(2, ms100), sleep(3, ms300)).Then(func(v []int) {
async.All(sleep(1, ms200), sleep(2, ms100), sleep(3, ms300)).Then(func(v []int) { fmt.Printf("All: %v\n", v)
fmt.Printf("All: %v\n", v) })
resolve(async.Void{})
})
}))
println("Run Race with Await") println("Run Race with Await")
async.Run(async.Async(func(resolve func(async.Void)) { first := async.Race(sleep(1, ms200), sleep(2, ms100), sleep(3, ms300))
first := async.Race(sleep(1, ms200), sleep(2, ms100), sleep(3, ms300)) v := async.Await(first)
v := async.Await(first) fmt.Printf("Race: %v\n", v)
fmt.Printf("Race: %v\n", v)
resolve(async.Void{})
}))
// Translated to in Go+: // Translated to in Go+:
println("Run All with BindIO") println("Run All with BindIO")
async.Run(async.Async(func(resolve func(async.Void)) { async.All(sleep(1, ms200), sleep(2, ms100), sleep(3, ms300)).Then(func(v []int) {
async.All(sleep(1, ms200), sleep(2, ms100), sleep(3, ms300)).Then(func(v []int) { fmt.Printf("All: %v\n", v)
fmt.Printf("All: %v\n", v) })
resolve(async.Void{})
})
}))
println("Run Race with BindIO") println("Run Race with BindIO")
async.Run(async.Async(func(resolve func(async.Void)) { async.Race(sleep(1, ms200), sleep(2, ms100), sleep(3, ms300)).Then(func(v int) {
async.Race(sleep(1, ms200), sleep(2, ms100), sleep(3, ms300)).Then(func(v int) { fmt.Printf("Race: %v\n", v)
fmt.Printf("Race: %v\n", v) })
resolve(async.Void{})
})
}))
} }
func RunTimeout() { func RunTimeout() {
println("Run Timeout with Await") println("Run Timeout with Await")
async.Run(async.Async(func(resolve func(async.Void)) { fmt.Printf("Start 100 ms timeout\n")
fmt.Printf("Start 100 ms timeout\n") async.Await(timeout.Timeout(100 * time.Millisecond))
async.Await(timeout.Timeout(100 * time.Millisecond)) fmt.Printf("timeout\n")
fmt.Printf("timeout\n")
resolve(async.Void{})
}))
// Translated to in Go+: // Translated to in Go+:
println("Run Timeout with BindIO") println("Run Timeout with BindIO")
async.Run(async.Async(func(resolve func(async.Void)) { fmt.Printf("Start 100 ms timeout\n")
fmt.Printf("Start 100 ms timeout\n") timeout.Timeout(100 * time.Millisecond).Then(func(async.Void) {
timeout.Timeout(100 * time.Millisecond).Then(func(async.Void) { fmt.Printf("timeout\n")
fmt.Printf("timeout\n") })
resolve(async.Void{})
})
}))
} }
func RunMultipleCallbacksNodelay() { func RunMultipleCallbacksNodelay() {
println("Run Multiple Callbacks") println("Run Multiple Callbacks")
runCnt := atomic.Int32{} runCnt := atomic.Int32{}
async.Run(async.Async(func(resolve func(async.Void)) {
nodelay := async.Async(func(resolve func(async.Void)) {
println("nodelay")
runCnt.Add(1)
resolve(async.Void{})
})
cbCnt := atomic.Int32{} nodelay := async.Async(func(resolve func(async.Void)) {
cb := func() { println("nodelay")
if cbCnt.Add(1) == 2 { runCnt.Add(1)
resolve(async.Void{}) })
cbCnt := atomic.Int32{}
cb := func() {
if cbCnt.Add(1) == 2 {
if runCnt.Load() != 1 {
panic("runCnt != 1, got: " + fmt.Sprint(runCnt.Load()))
} else {
println("runCnt == 1")
} }
} }
nodelay.Then(func(async.Void) {
println("nodelay done")
cb()
})
nodelay.Then(func(async.Void) {
println("nodelay done again")
cb()
})
}))
if runCnt.Load() != 1 {
panic("runCnt != 1")
} }
nodelay.Then(func(async.Void) {
println("nodelay done")
cb()
})
nodelay.Then(func(async.Void) {
println("nodelay done again")
cb()
})
} }
func RunMultipleCallbacksDelay() { func RunMultipleCallbacksDelay() {
println("Run Multiple Callbacks") println("Run Multiple Callbacks")
runCnt := atomic.Int32{} runCnt := atomic.Int32{}
async.Run(async.Async(func(resolve func(async.Void)) {
delay := async.Async(func(resolve func(async.Void)) {
timeout.Timeout(100 * time.Millisecond).Then(func(async.Void) {
println("delay")
runCnt.Add(1)
resolve(async.Void{})
})
})
cbCnt := atomic.Int32{} delay := async.Async(func(resolve func(async.Void)) {
cb := func() { timeout.Timeout(100 * time.Millisecond).Then(func(async.Void) {
if cbCnt.Add(1) == 2 { println("delay")
resolve(async.Void{}) runCnt.Add(1)
})
})
cbCnt := atomic.Int32{}
cb := func() {
if cbCnt.Add(1) == 2 {
if runCnt.Load() != 1 {
panic("runCnt != 1, got: " + fmt.Sprint(runCnt.Load()))
} else {
println("runCnt == 1")
} }
} }
delay.Then(func(async.Void) {
println("delay done")
cb()
})
delay.Then(func(async.Void) {
println("delay done again")
cb()
})
}))
if runCnt.Load() != 1 {
panic("runCnt != 1")
} }
delay.Then(func(async.Void) {
println("delay done")
cb()
})
delay.Then(func(async.Void) {
println("delay done again")
cb()
})
} }
func RunSocket() { func RunSocket() {
println("Run Socket") println("Run Socket")
async.Run(async.Async(func(resolve func(async.Void)) { println("RunServer")
println("RunServer")
RunServer().Then(func(async.Void) { RunServer().Then(func(async.Void) {
println("RunServer done") println("RunServer done")
resolve(async.Void{}) })
println("RunClient")
timeout.Timeout(100 * time.Millisecond).Then(func(async.Void) {
RunClient("Bob").Then(func(async.Void) {
println("RunClient done")
}) })
RunClient("Uncle").Then(func(async.Void) {
println("RunClient") println("RunClient done")
timeout.Timeout(100 * time.Millisecond).Then(func(async.Void) {
RunClient("Bob").Then(func(async.Void) {
println("RunClient done")
resolve(async.Void{})
})
RunClient("Uncle").Then(func(async.Void) {
println("RunClient done")
resolve(async.Void{})
})
}) })
})) })
} }
func RunClient(name string) async.Future[async.Void] { func RunClient(name string) async.Future[async.Void] {

View File

@@ -27,20 +27,18 @@ func Async[T any](fn func(func(T))) Future[T] {
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(1) wg.Add(1)
return func(chain func(T)) { once.Do(func() {
once.Do(func() {
go func() {
fn(func(v T) {
result = v
wg.Done()
})
}()
})
go func() { go func() {
wg.Wait() fn(func(v T) {
chain(result) result = v
wg.Done()
})
}() }()
})
return func(chain func(T)) {
wg.Wait()
chain(result)
} }
} }

View File

@@ -28,66 +28,42 @@ import (
) )
// Currently Async run chain a future that call chain in the goroutine running `async.Run`. // Currently Async run chain a future that call chain in the goroutine running `async.Run`.
// Basic implementation:
// func Async[T any](fn func(func(T))) Future[T] {
// return func(chain func(T)) {
// loop := Exec().L
// var result T
// var a *libuv.Async
// var cb libuv.AsyncCb
// a, cb = cbind.BindF[libuv.Async, libuv.AsyncCb](func(a *libuv.Async) {
// a.Close(nil)
// chain(result)
// })
// loop.Async(a, cb)
// fn(func(v T) {
// result = v
// a.Send()
// })
// }
// }
//
// Better implementation to support multiple callbacks:
func Async[T any](fn func(func(T))) Future[T] { func Async[T any](fn func(func(T))) Future[T] {
var result T var result T
var done atomic.Bool
var resultReady atomic.Bool var resultReady atomic.Bool
var callbacks []func(T) var callbacks []func(T)
var mutex sync.Mutex var mutex sync.Mutex
loop := Exec().L
var a *libuv.Async
var cb libuv.AsyncCb
a, cb = cbind.BindF[libuv.Async, libuv.AsyncCb](func(a *libuv.Async) {
a.Close(nil)
mutex.Lock()
currentCallbacks := callbacks
callbacks = nil
mutex.Unlock()
for _, callback := range currentCallbacks {
callback(result)
}
})
loop.Async(a, cb)
// Execute fn immediately
fn(func(v T) {
result = v
resultReady.Store(true)
a.Send()
})
return func(chain func(T)) { return func(chain func(T)) {
mutex.Lock() mutex.Lock()
if resultReady.Load() { if resultReady.Load() {
mutex.Unlock() mutex.Unlock()
chain(result) chain(result)
return
}
callbacks = append(callbacks, chain)
if !done.Swap(true) {
mutex.Unlock()
loop := Exec().L
var a *libuv.Async
var cb libuv.AsyncCb
a, cb = cbind.BindF[libuv.Async, libuv.AsyncCb](func(a *libuv.Async) {
a.Close(nil)
mutex.Lock()
resultReady.Store(true)
currentCallbacks := callbacks
callbacks = nil
mutex.Unlock()
for _, callback := range currentCallbacks {
callback(result)
}
})
loop.Async(a, cb)
fn(func(v T) {
result = v
a.Send()
})
} else { } else {
callbacks = append(callbacks, chain)
mutex.Unlock() mutex.Unlock()
} }
} }