asyncio: x/io -> x/async

This commit is contained in:
Li Jie
2024-07-29 23:17:21 +08:00
parent 375b2b579e
commit 0d3e78ad94
5 changed files with 25 additions and 25 deletions

464
x/async/README.md Normal file
View File

@@ -0,0 +1,464 @@
# Async I/O Design
## Async functions in different languages
### JavaScript
- [Async/Await](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Statements/async_function)
Prototype:
```javascript
async function name(param0) {
statements;
}
async function name(param0, param1) {
statements;
}
async function name(param0, param1, /* …, */ paramN) {
statements;
}
```
Example:
```typescript
async function resolveAfter1Second(): Promise<string> {
return new Promise((resolve) => {
setTimeout(() => {
resolve("Resolved after 1 second");
}, 1000);
});
}
async function asyncCall(): Promise<string> {
const result = await resolveAfter1Second();
return `AsyncCall: ${result}`;
}
function asyncCall2(): Promise<string> {
return resolveAfter1Second();
}
function asyncCall3(): void {
resolveAfter1Second().then((result) => {
console.log(`AsyncCall3: ${result}`);
});
}
async function main() {
console.log("Starting AsyncCall");
const result1 = await asyncCall();
console.log(result1);
console.log("Starting AsyncCall2");
const result2 = await asyncCall2();
console.log(result2);
console.log("Starting AsyncCall3");
asyncCall3();
// Wait for AsyncCall3 to complete
await new Promise((resolve) => setTimeout(resolve, 1000));
console.log("Main function completed");
}
main().catch(console.error);
```
### Python
- [async def](https://docs.python.org/3/library/asyncio-task.html#coroutines)
Prototype:
```python
async def name(param0):
statements
```
Example:
```python
import asyncio
async def resolve_after_1_second() -> str:
await asyncio.sleep(1)
return "Resolved after 1 second"
async def async_call() -> str:
result = await resolve_after_1_second()
return f"AsyncCall: {result}"
def async_call2() -> asyncio.Task:
return resolve_after_1_second()
def async_call3() -> None:
asyncio.create_task(print_after_1_second())
async def print_after_1_second() -> None:
result = await resolve_after_1_second()
print(f"AsyncCall3: {result}")
async def main():
print("Starting AsyncCall")
result1 = await async_call()
print(result1)
print("Starting AsyncCall2")
result2 = await async_call2()
print(result2)
print("Starting AsyncCall3")
async_call3()
# Wait for AsyncCall3 to complete
await asyncio.sleep(1)
print("Main function completed")
# Run the main coroutine
asyncio.run(main())
```
### Rust
- [async fn](https://doc.rust-lang.org/std/keyword.async.html)
Prototype:
```rust
async fn name(param0: Type) -> ReturnType {
statements
}
```
Example:
```rust
use std::time::Duration;
use tokio::time::sleep;
use std::future::Future;
async fn resolve_after_1_second() -> String {
sleep(Duration::from_secs(1)).await;
"Resolved after 1 second".to_string()
}
async fn async_call() -> String {
let result = resolve_after_1_second().await;
format!("AsyncCall: {}", result)
}
fn async_call2() -> impl Future<Output = String> {
resolve_after_1_second()
}
fn async_call3() {
tokio::spawn(async {
let result = resolve_after_1_second().await;
println!("AsyncCall3: {}", result);
});
}
#[tokio::main]
async fn main() {
println!("Starting AsyncCall");
let result1 = async_call().await;
println!("{}", result1);
println!("Starting AsyncCall2");
let result2 = async_call2().await;
println!("{}", result2);
println!("Starting AsyncCall3");
async_call3();
// Wait for AsyncCall3 to complete
sleep(Duration::from_secs(2)).await;
println!("Main function completed");
}
```
### C#
- [async](https://docs.microsoft.com/en-us/dotnet/csharp/programming-guide/concepts/async/)
Prototype:
```csharp
async Task<ReturnType> NameAsync(Type param0)
{
statements;
}
```
Example:
```csharp
using System;
using System.Threading.Tasks;
class Program
{
static async Task<string> ResolveAfter1Second()
{
await Task.Delay(1000);
return "Resolved after 1 second";
}
static async Task<string> AsyncCall()
{
string result = await ResolveAfter1Second();
return $"AsyncCall: {result}";
}
static Task<string> AsyncCall2()
{
return ResolveAfter1Second();
}
static void AsyncCall3()
{
_ = Task.Run(async () =>
{
string result = await ResolveAfter1Second();
Console.WriteLine($"AsyncCall3: {result}");
});
}
static async Task Main()
{
Console.WriteLine("Starting AsyncCall");
string result1 = await AsyncCall();
Console.WriteLine(result1);
Console.WriteLine("Starting AsyncCall2");
string result2 = await AsyncCall2();
Console.WriteLine(result2);
Console.WriteLine("Starting AsyncCall3");
AsyncCall3();
// Wait for AsyncCall3 to complete
await Task.Delay(1000);
Console.WriteLine("Main method completed");
}
}
```
### C++ 20 Coroutines
- [co_await](https://en.cppreference.com/w/cpp/language/coroutines)
Prototype:
```cpp
TaskReturnType NameAsync(Type param0)
{
co_return co_await expression;
}
```
Example:
```cpp
#include <cppcoro/task.hpp>
#include <cppcoro/sync_wait.hpp>
#include <cppcoro/when_all.hpp>
#include <chrono>
#include <iostream>
#include <thread>
cppcoro::task<std::string> resolveAfter1Second() {
co_await std::chrono::seconds(1);
co_return "Resolved after 1 second";
}
cppcoro::task<std::string> asyncCall() {
auto result = co_await resolveAfter1Second();
co_return "AsyncCall: " + result;
}
cppcoro::task<std::string> asyncCall2() {
return resolveAfter1Second();
}
cppcoro::task<void> asyncCall3() {
auto result = co_await resolveAfter1Second();
std::cout << "AsyncCall3: " << result << std::endl;
}
cppcoro::task<void> main() {
std::cout << "Starting AsyncCall" << std::endl;
auto result1 = co_await asyncCall();
std::cout << result1 << std::endl;
std::cout << "Starting AsyncCall2" << std::endl;
auto result2 = co_await asyncCall2();
std::cout << result2 << std::endl;
std::cout << "Starting AsyncCall3" << std::endl;
auto asyncCall3Task = asyncCall3();
// Wait for AsyncCall3 to complete
co_await asyncCall3Task;
std::cout << "Main function completed" << std::endl;
}
int main() {
try {
cppcoro::sync_wait(::main());
} catch (const std::exception& e) {
std::cerr << "Error: " << e.what() << std::endl;
return 1;
}
return 0;
}
```
## Common concepts
### Promise, Future, Task, and Coroutine
- **Promise**: An object that represents the eventual completion (or failure) of an asynchronous operation and its resulting value. It is used to produce a value that will be consumed by a `Future`.
- **Future**: An object that represents the result of an asynchronous operation. It is used to obtain the value produced by a `Promise`.
- **Task**: A unit of work that can be scheduled and executed asynchronously. It is a higher-level abstraction that combines a `Promise` and a `Future`.
- **Coroutine**: A special type of function that can suspend its execution and return control to the caller without losing its state. It can be resumed later, allowing for asynchronous programming.
### `async`, `await` and similar keywords
- **`async`**: A keyword used to define a function that returns a `Promise` or `Task`. It allows the function to pause its execution and resume later.
- **`await`**: A keyword used to pause the execution of an `async` function until a `Promise` or `Task` is resolved. It unwraps the value of the `Promise` or `Task` and allows the function to continue.
- **`co_return`**: A keyword used in C++ coroutines to return a value from a coroutine. It is similar to `return` but is used in coroutines to indicate that the coroutine has completed. It's similar to `return` in `async` functions in other languages that boxes the value into a `Promise` or `Task`.
`async/await` and similar constructs provide a more readable and synchronous-like way of writing asynchronous code, it hides the type of `Promise`/`Future`/`Task` from the user and allows them to focus on the logic of the code.
### Executing Multiple Async Operations Concurrently
To run multiple promises concurrently, JavaScript provides `Promise.all`, `Promise.allSettled` and `Promise.any`, Python provides `asyncio.gather`, Rust provides `tokio::try_join`, C# provides `Task.WhenAll`, and C++ provides `cppcoro::when_all`.
In some situations, you may want to get the first result of multiple async operations. JavaScript provides `Promise.race` to get the first result of multiple promises. Python provides `asyncio.wait` to get the first result of multiple coroutines. Rust provides `tokio::select!` to get the first result of multiple futures. C# provides `Task.WhenAny` to get the first result of multiple tasks. C++ provides `cppcoro::when_any` to get the first result of multiple tasks. Those functions are very simular to `select` in Go.
### Error Handling
`await` commonly unwraps the value of a `Promise` or `Task`, but it also propagates errors. If the `Promise` or `Task` is rejected or throws an error, the error will be thrown in the `async` function by the `await` keyword. You can use `try/catch` blocks to handle errors in `async` functions.
## Common patterns
- `async` keyword hides the types of `Promise`/`Future`/`Task` in the function signature in Python and Rust, but not in JavaScript, C#, and C++.
- `await` keyword unwraps the value of a `Promise`/`Future`/`Task`.
- `return` keyword boxes the value into a `Promise`/`Future`/`Task` if it's not already.
## Design considerations in LLGo
- Don't introduce `async`/`await` keywords to compatible with Go compiler (just compiling)
- For performance reason don't implement async functions with goroutines
- Avoid implementing `Promise` by using `chan` to avoid blocking the thread, but it can be wrapped as a `chan` to make it compatible `select` statement
## Design
Introduce `Promise` type to represent an asynchronous operation and its resulting value. `Promise` can be resolved with a value with an error. `Promise` can be awaited to get the value and error.
`Promise` just a type indicating the asynchronous operation, it can't be created and assigned directly. It be replaced to `PromiseImpl` by the LLGo compiler.
```go
// Some native async functions
func timeoutAsync(d time.Duration, cb func()) {
go func() {
time.Sleep(d)
cb()
}()
}
// Wrap callback-based async function into Promise
func resolveAfter1Second() (resolve Promise[string]) {
timeoutAsync(1 * time.Second, func() {
resolve("Resolved after 1 second", nil)
})
}
// Compiled to:
func resolveAfter1Second() (resolve PromiseImpl[string]) {
promise := io.NewPromiseImpl[string](resolve func(value string, err error) {
resolve: func(value string, err error) {
for true {
switch (promise.prev = promise.next) {
case 0:
timeoutAsync(1 * time.Second, func() {
resolve("Resolved after 1 second", nil)
})
}
}
},
}
return promise
}
func asyncCall() (resolve Promise[string]) {
str, err := resolveAfter1Second().Await()
resolve("AsyncCall: " + str, err)
}
// Compiled to:
func asyncCall() (resolve PromiseImpl[string]) {
promise := io.NewPromiseImpl[string](resolve func(value string, err error) {
for true {
switch (promise.prev = promise.next) {
case 0:
resolveAfter1Second()
return
case 1:
str, err := promise.value, promise.err
resolve("AsyncCall: " + str, err)
return
}
}
})
return promise
}
// Directly return Promise
func asyncCall2() Promise[string] {
return resolveAfter1Second()
}
// Compiled to:
func asyncCall2() PromiseImpl[string] {
return resolveAfter1Second()
}
// Don't wait for Promise to complete
func asyncCall3() {
resolveAfter1Second().Then(func(result string) {
fmt.Println("AsyncCall3: " + result)
})
}
func asyncMain() {
fmt.Println("Starting AsyncCall")
result1 := asyncCall().Await()
fmt.Println(result1)
fmt.Println("Starting AsyncCall2")
result2 := asyncCall2().Await()
fmt.Println(result2)
fmt.Println("Starting AsyncCall3")
asyncCall3()
// Wait for AsyncCall3 to complete
time.Sleep(2 * time.Second)
fmt.Println("Main function completed")
}
```

View File

@@ -0,0 +1,623 @@
package main
import (
"encoding/json"
"fmt"
"log"
"strings"
"time"
"github.com/goplus/llgo/x/async"
"github.com/goplus/llgo/x/async/naive"
"github.com/goplus/llgo/x/tuple"
)
// -----------------------------------------------------------------------------
func http(method string, url string, callback func(resp *Response, err error)) {
go func() {
body := ""
if strings.HasPrefix(url, "http://example.com/user/") {
name := url[len("http://example.com/user/"):]
body = `{"name":"` + name + `"}`
} else if strings.HasPrefix(url, "http://example.com/score/") {
body = "99.5"
}
time.Sleep(200 * time.Millisecond)
resp := &Response{StatusCode: 200, Body: body}
callback(resp, nil)
}()
}
// -----------------------------------------------------------------------------
type Response struct {
StatusCode int
Body string
}
func (r *Response) Text() (co *async.Promise[tuple.Tuple2[string, error]]) {
co.Return(tuple.Tuple2[string, error]{V1: r.Body, V2: nil})
return
}
func (r *Response) TextCompiled() *naive.PromiseImpl[tuple.Tuple2[string, error]] {
co := &naive.PromiseImpl[tuple.Tuple2[string, error]]{}
co.Debug = "Text"
co.Func = func() {
switch co.Next {
case 0:
co.Next = -1
co.Return(tuple.Tuple2[string, error]{V1: r.Body, V2: nil})
return
default:
panic("Promise already done")
}
}
return co
}
// async AsyncHttpGet(url string) (resp *Response, err error) {
// http("GET", url, func(resp *Response, err error) {
// return resp, err
// })
// }
func AsyncHttpGet(url string) *async.Promise[tuple.Tuple2[*Response, error]] {
co := &async.Promise[tuple.Tuple2[*Response, error]]{}
http("GET", url, func(resp *Response, err error) {
co.Return(tuple.Tuple2[*Response, error]{V1: resp, V2: nil})
})
co.Suspend()
return co
}
func AsyncHttpGetCompiled(url string) *naive.PromiseImpl[tuple.Tuple2[*Response, error]] {
co := &naive.PromiseImpl[tuple.Tuple2[*Response, error]]{}
co.Debug = "HttpGet"
co.Func = func() {
switch co.Next {
case 0:
co.Next = -1
http("GET", url, func(resp *Response, err error) {
co.Return(tuple.Tuple2[*Response, error]{V1: resp, V2: nil})
})
co.Suspend()
return
default:
panic("Promise already done")
}
}
return co
}
func AsyncHttpPost(url string) *async.Promise[tuple.Tuple2[*Response, error]] {
co := &async.Promise[tuple.Tuple2[*Response, error]]{}
http("POST", url, func(resp *Response, err error) {
co.Return(tuple.Tuple2[*Response, error]{V1: resp, V2: nil})
})
co.Suspend()
return co
}
func AsyncHttpPostCompiled(url string) *naive.PromiseImpl[tuple.Tuple2[*Response, error]] {
P := &naive.PromiseImpl[tuple.Tuple2[*Response, error]]{}
P.Debug = "HttpPost"
P.Func = func() {
switch P.Next {
case 0:
P.Next = -1
http("POST", url, func(resp *Response, err error) {
P.Return(tuple.Tuple2[*Response, error]{V1: resp, V2: nil})
})
return
default:
panic("Promise already done")
}
}
return P
}
// -----------------------------------------------------------------------------
type User struct {
Name string
}
func GetUser(name string) (co *naive.PromiseImpl[tuple.Tuple2[User, error]]) {
resp, err := AsyncHttpGet("http://example.com/user/" + name).Await().Values()
if err != nil {
// return User{}, err
co.Return(tuple.Tuple2[User, error]{V1: User{}, V2: err})
return
}
if resp.StatusCode != 200 {
// return User{}, fmt.Errorf("http status code: %d", resp.StatusCode)
co.Return(tuple.Tuple2[User, error]{V1: User{}, V2: fmt.Errorf("http status code: %d", resp.StatusCode)})
return
}
body, err := resp.Text().Await().Values()
if err != nil {
// return User{}, err
co.Return(tuple.Tuple2[User, error]{V1: User{}, V2: err})
return
}
user := User{}
if err := json.Unmarshal([]byte(body), &user); err != nil {
// return User{}, err
co.Return(tuple.Tuple2[User, error]{V1: User{}, V2: err})
return
}
// return user, nil
co.Return(tuple.Tuple2[User, error]{V1: user, V2: nil})
return
}
func GetUserCompiled(name string) (co *naive.PromiseImpl[tuple.Tuple2[User, error]]) {
var state1 *naive.PromiseImpl[tuple.Tuple2[*Response, error]]
var state2 *naive.PromiseImpl[tuple.Tuple2[string, error]]
co = &naive.PromiseImpl[tuple.Tuple2[User, error]]{}
co.Debug = "GetUser"
co.Func = func() {
switch co.Next {
case 0:
co.Next = 1
state1 = AsyncHttpGetCompiled("http://example.com/user/" + name)
state1.Exec = co.Exec
state1.Parent = co
state1.Call()
return
case 1:
co.Next = 2
resp, err := state1.Value().Values()
log.Printf("resp: %v, err: %v\n", resp, err)
if err != nil {
co.Return(tuple.Tuple2[User, error]{V1: User{}, V2: err})
return
}
if resp.StatusCode != 200 {
co.Return(tuple.Tuple2[User, error]{V1: User{}, V2: fmt.Errorf("http status code: %d", resp.StatusCode)})
return
}
state2 = resp.TextCompiled()
state2.Exec = co.Exec
state2.Parent = co
state2.Call()
return
case 2:
co.Next = -1
body, err := state2.Value().Values()
if err != nil {
co.Return(tuple.Tuple2[User, error]{V1: User{}, V2: err})
return
}
user := User{}
log.Printf("body: %v\n", body)
if err := json.Unmarshal([]byte(body), &user); err != nil {
co.Return(tuple.Tuple2[User, error]{V1: User{}, V2: err})
return
}
log.Printf("resolve user: %+v\n", user)
co.Return(tuple.Tuple2[User, error]{V1: user, V2: nil})
return
default:
panic(fmt.Errorf("Promise already done, %+v", co))
}
}
return
}
func GetScore() (co *naive.PromiseImpl[tuple.Tuple2[float64, error]]) {
resp, err := AsyncHttpGet("http://example.com/score/").Await().Values()
if err != nil {
co.Return(tuple.Tuple2[float64, error]{V1: 0, V2: err})
return
}
if resp.StatusCode != 200 {
// return 0, fmt.Errorf("http status code: %d", resp.StatusCode)
co.Return(tuple.Tuple2[float64, error]{V1: 0, V2: fmt.Errorf("http status code: %d", resp.StatusCode)})
return
}
body, err := resp.Text().Await().Values()
if err != nil {
// return 0, err
co.Return(tuple.Tuple2[float64, error]{V1: 0, V2: err})
return
}
score := 0.0
if _, err := fmt.Sscanf(body, "%f", &score); err != nil {
// return 0, err
co.Return(tuple.Tuple2[float64, error]{V1: 0, V2: err})
return
}
// return score, nil
co.Return(tuple.Tuple2[float64, error]{V1: score, V2: nil})
return
}
func GetScoreCompiled() *naive.PromiseImpl[tuple.Tuple2[float64, error]] {
var state1 *naive.PromiseImpl[tuple.Tuple2[*Response, error]]
var state2 *naive.PromiseImpl[tuple.Tuple2[string, error]]
co := &naive.PromiseImpl[tuple.Tuple2[float64, error]]{}
co.Debug = "GetScore"
co.Func = func() {
switch co.Next {
case 0:
co.Next = 1
state1 = AsyncHttpGetCompiled("http://example.com/score/")
state1.Exec = co.Exec
state1.Parent = co
state1.Call()
return
case 1:
co.Next = 2
resp, err := state1.Value().Values()
if err != nil {
co.Return(tuple.Tuple2[float64, error]{V1: 0, V2: err})
return
}
if resp.StatusCode != 200 {
co.Return(tuple.Tuple2[float64, error]{V1: 0, V2: fmt.Errorf("http status code: %d", resp.StatusCode)})
return
}
state2 = resp.TextCompiled()
state2.Exec = co.Exec
state2.Parent = co
state2.Call()
return
case 2:
co.Next = -1
body, err := state2.Value().Values()
if err != nil {
co.Return(tuple.Tuple2[float64, error]{V1: 0, V2: err})
return
}
score := 0.0
if _, err := fmt.Sscanf(body, "%f", &score); err != nil {
co.Return(tuple.Tuple2[float64, error]{V1: 0, V2: err})
return
}
co.Return(tuple.Tuple2[float64, error]{V1: score, V2: nil})
return
default:
panic("Promise already done")
}
}
return co
}
func DoUpdate(op string) (co *naive.PromiseImpl[error]) {
resp, err := AsyncHttpPost("http://example.com/update/" + op).Await().Values()
if err != nil {
co.Return(err)
return
}
if resp.StatusCode != 200 {
co.Return(fmt.Errorf("http status code: %d", resp.StatusCode))
}
co.Return(nil)
return
}
func DoUpdateCompiled(op string) *naive.PromiseImpl[error] {
var state1 *naive.PromiseImpl[tuple.Tuple2[*Response, error]]
co := &naive.PromiseImpl[error]{}
co.Debug = "DoUpdate"
co.Func = func() {
switch co.Next {
case 0:
co.Next = 1
state1 = AsyncHttpPostCompiled("http://example.com/update/" + op)
state1.Exec = co.Exec
state1.Parent = co
state1.Call()
return
case 1:
co.Next = -1
resp, err := state1.Value().Values()
if err != nil {
co.Return(err)
return
}
if resp.StatusCode != 200 {
co.Return(fmt.Errorf("http status code: %d", resp.StatusCode))
return
}
co.Return(nil)
return
default:
panic("Promise already done")
}
}
return co
}
func GenInts() (co *naive.PromiseImpl[int]) {
co.Yield(3)
co.Yield(2)
co.Yield(5)
return
}
func GenIntsCompiled() *naive.PromiseImpl[int] {
co := &naive.PromiseImpl[int]{}
co.Debug = "GenInts"
co.Func = func() {
switch co.Next {
case 0:
co.Next = 1
co.Yield(3)
return
case 1:
co.Next = 2
co.Yield(2)
return
case 2:
co.Next = 3
co.Yield(5)
return
case 3:
co.Next = -1
default:
panic("Generator already done")
}
}
return co
}
// Generator with async calls and panic
func GenUsers() (co *naive.PromiseImpl[User]) {
u, err := GetUser("Alice").Await().Values()
if err != nil {
panic(err)
}
co.Yield(u)
u, err = GetUser("Bob").Await().Values()
if err != nil {
panic(err)
}
co.Yield(u)
u, err = GetUser("Cindy").Await().Values()
if err != nil {
panic(err)
}
co.Yield(u)
log.Printf("genUsers done\n")
return
}
func GenUsersCompiled() (resolve *naive.PromiseImpl[User]) {
var state1, state2, state3 *naive.PromiseImpl[tuple.Tuple2[User, error]]
co := &naive.PromiseImpl[User]{}
co.Debug = "GenUsers"
co.Func = func() {
switch co.Next {
case 0:
co.Next = 1
state1 = GetUserCompiled("Alice")
state1.Exec = co.Exec
state1.Parent = co
state1.Call()
return
case 1:
co.Next = 2
u, err := state1.Value().Values()
if err != nil {
panic(err)
} else {
co.Yield(u)
}
return
case 2:
co.Next = 3
state2 = GetUserCompiled("Bob")
state2.Exec = co.Exec
state2.Parent = co
state2.Call()
return
case 3:
co.Next = 4
u, err := state2.Value().Values()
if err != nil {
panic(err)
} else {
co.Yield(u)
}
return
case 4:
co.Next = 5
state3 = GetUserCompiled("Cindy")
state3.Exec = co.Exec
state3.Parent = co
state3.Call()
return
case 5:
co.Next = 6
u, err := state3.Value().Values()
if err != nil {
panic(err)
} else {
co.Yield(u)
}
return
case 6:
co.Next = -1
default:
panic("Generator already done")
}
}
return co
}
func Demo() (co *async.Promise[async.Void]) {
user, err := GetUser("1").Await().Values()
log.Println(user, err)
user, err = naive.Race[tuple.Tuple2[User, error]](GetUser("2"), GetUser("3"), GetUser("4")).Value().Values()
log.Println(user, err)
users := naive.All[tuple.Tuple2[User, error]]([]naive.AsyncCall[tuple.Tuple2[User, error]]{GetUser("5"), GetUser("6"), GetUser("7")}).Value()
log.Println(users, err)
user, score, _ := naive.Await3Compiled[User, float64, async.Void](GetUser("8"), GetScore(), DoUpdate("update sth.")).Value().Values()
log.Println(user, score, err)
// for loop with generator
g := GenInts()
for {
g.Call()
if g.Done() {
break
}
log.Println("genInt:", g.Value(), g.Done())
}
// for loop with async generator
// for u, err := range GenUsers() {...}
g1 := GenUsers()
for {
g1.Call()
u := g1.Await()
if g1.Done() {
break
}
log.Println("genUser:", u, err)
}
// TODO(lijie): select from multiple promises without channel
// select {
// case user := <-GetUser("123").Chan():
// log.Println("user:", user)
// case score := <-GetScore().Chan():
// log.Println("score:", score)
// case <-async.Timeout(5 * time.Second).Chan():
// log.Println("timeout")
// }
log.Println("Demo done")
co.Return(async.Void{})
return
}
func DemoCompiled() *naive.PromiseImpl[async.Void] {
var state1 *naive.PromiseImpl[tuple.Tuple2[User, error]]
var state2 *naive.PromiseImpl[tuple.Tuple2[User, error]]
var state3 *naive.PromiseImpl[[]tuple.Tuple2[User, error]]
var state4 *naive.PromiseImpl[tuple.Tuple3[tuple.Tuple2[User, error], tuple.Tuple2[float64, error], error]]
var g1 *naive.PromiseImpl[int]
var g2 *naive.PromiseImpl[User]
P := &naive.PromiseImpl[async.Void]{}
P.Debug = "Demo"
P.Func = func() {
switch P.Next {
case 0:
P.Next = 1
state1 = GetUserCompiled("1")
state1.Exec = P.Exec
state1.Parent = P
state1.Call()
return
case 1:
P.Next = 2
user, err := state1.Value().Values()
log.Printf("user: %+v, err: %v\n", user, err)
state2 = naive.Race[tuple.Tuple2[User, error]](GetUserCompiled("2"), GetUserCompiled("3"), GetUserCompiled("4"))
state2.Exec = P.Exec
state2.Parent = P
state2.Call()
return
case 2:
P.Next = 3
user, err := state2.Value().Values()
log.Printf("race user: %+v, err: %v\n", user, err)
state3 = naive.All[tuple.Tuple2[User, error]]([]naive.AsyncCall[tuple.Tuple2[User, error]]{GetUserCompiled("5"), GetUserCompiled("6"), GetUserCompiled("7")})
state3.Exec = P.Exec
state3.Parent = P
state3.Call()
return
case 3:
P.Next = 4
users := state3.Value()
log.Println(users)
state4 = naive.Await3Compiled[tuple.Tuple2[User, error], tuple.Tuple2[float64, error], error](GetUserCompiled("8"), GetScoreCompiled(), DoUpdateCompiled("update sth."))
state4.Exec = P.Exec
state4.Parent = P
state4.Call()
return
case 4:
P.Next = 5
user, score, _ := state4.Value().Values()
log.Println(user, score)
g1 = GenIntsCompiled()
for {
g1.Call()
if g1.Done() {
break
}
log.Printf("genInt: %+v, done: %v\n", g1.Value(), g1.Done())
}
g2 = GenUsersCompiled()
g2.Exec = P.Exec
g2.Parent = P
g2.Call()
return
case 5:
g2.Call()
if g2.Done() {
P.Next = -1
log.Printf("Demo done\n")
P.Return(async.Void{})
return
}
log.Printf("genUser: %+v, done: %v\n", g2.Value(), g2.Done())
return
default:
panic("Promise already done")
}
}
return P
}
func main() {
log.SetFlags(log.Lshortfile | log.LstdFlags)
log.Printf("=========== Run Naive Demo ===========\n")
v := naive.RunImpl[async.Void](DemoCompiled())
log.Println(v)
log.Printf("=========== Run Naive Demo finished ===========\n")
log.Printf("=========== Run Demo ===========\n")
v1 := Demo()
log.Println(v1)
log.Printf("=========== Run Demo finished ===========\n")
}

73
x/async/async.go Normal file
View File

@@ -0,0 +1,73 @@
/*
* Copyright (c) 2024 The GoPlus Authors (goplus.org). All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package async
import (
"unsafe"
_ "unsafe"
)
const (
LLGoPackage = "decl"
)
const debugAsync = false
type Void = [0]byte
type AsyncCall[TOut any] interface{}
// -----------------------------------------------------------------------------
type Promise[TOut any] struct {
hdl unsafe.Pointer
value TOut
}
// llgo:link PromiseImpl llgo.coAwait
func (p *Promise[TOut]) Await() TOut {
panic("should not executed")
}
// llgo:link Return llgo.coReturn
func (p *Promise[TOut]) Return(v TOut) {
panic("should not executed")
}
// llgo:link Yield llgo.coYield
func (p *Promise[TOut]) Yield(v TOut) {
panic("should not executed")
}
// llgo:link Suspend llgo.coSuspend
func (p *Promise[TOut]) Suspend() {
panic("should not executed")
}
// llgo:link Resume llgo.coResume
func (p *Promise[TOut]) Resume() {
panic("should not executed")
}
func (p *Promise[TOut]) Value() TOut {
return p.value
}
// llgo:link Run llgo.coRun
func Run[TOut any](f func() TOut) TOut {
panic("should not executed")
}

285
x/async/naive/extra.go Normal file
View File

@@ -0,0 +1,285 @@
/*
* Copyright (c) 2024 The GoPlus Authors (goplus.org). All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package naive
import (
"log"
"sync"
"time"
_ "unsafe"
"github.com/goplus/llgo/x/async"
"github.com/goplus/llgo/x/tuple"
)
// -----------------------------------------------------------------------------
func TimeoutCompiled(d time.Duration) *PromiseImpl[async.Void] {
P := &PromiseImpl[async.Void]{}
P.Debug = "Timeout"
P.Func = func() {
go func() {
time.Sleep(d)
P.Return(async.Void{})
}()
}
return P
}
type Result[T any] struct {
V T
Err error
}
func Race[OutT any](acs ...AsyncCall[OutT]) *PromiseImpl[OutT] {
if len(acs) == 0 {
panic("race: no promise")
}
ps := make([]*PromiseImpl[OutT], len(acs))
for idx, ac := range acs {
ps[idx] = ac.(*PromiseImpl[OutT])
}
remaining := len(acs)
returned := false
P := &PromiseImpl[OutT]{}
P.Debug = "Race"
P.Func = func() {
switch P.Next {
case 0:
P.Next = 1
for _, p := range ps {
p.Exec = P.Exec
p.Parent = P
p.Call()
}
return
case 1:
remaining--
if remaining < 0 {
log.Fatalf("race: remaining < 0: %+v\n", remaining)
}
if returned {
return
}
for _, p := range ps {
if p.Done() {
if debugAsync {
log.Printf("async.Race done: %+v won the race\n", p)
}
returned = true
P.Return(p.value)
return
}
}
log.Fatalf("no promise done: %+v\n", ps)
return
default:
panic("unreachable")
}
}
return P
}
func All[OutT any](acs []AsyncCall[OutT]) *PromiseImpl[[]OutT] {
ps := make([]*PromiseImpl[OutT], len(acs))
for idx, ac := range acs {
ps[idx] = ac.(*PromiseImpl[OutT])
}
done := 0
P := &PromiseImpl[[]OutT]{}
P.Debug = "All"
P.Func = func() {
switch P.Next {
case 0:
P.Next = 1
for _, p := range ps {
p.Exec = P.Exec
p.Parent = P
p.Call()
}
return
case 1:
done++
if done < len(acs) {
return
}
P.Next = -1
for _, p := range ps {
if !p.Done() {
log.Fatalf("async.All: not done: %+v\n", p)
}
}
ret := make([]OutT, len(acs))
for idx, p := range ps {
ret[idx] = p.value
}
if debugAsync {
log.Printf("async.All done: %+v\n", ret)
}
P.Return(ret)
return
default:
panic("unreachable")
}
}
return P
}
// llgo:link Await2 llgo.await
func Await2Compiled[OutT1, OutT2 any](
ac1 AsyncCall[OutT1], ac2 AsyncCall[OutT2],
timeout ...time.Duration) (ret *PromiseImpl[tuple.Tuple3[OutT1, OutT2, error]]) {
p1 := ac1.(*PromiseImpl[OutT1])
p2 := ac2.(*PromiseImpl[OutT2])
remaining := 2
P := &PromiseImpl[tuple.Tuple3[OutT1, OutT2, error]]{}
P.Debug = "Await2"
P.Func = func() {
switch P.Next {
case 0:
P.Next = 1
p1.Exec = P.Exec
p1.Parent = P
p1.Call()
p2.Exec = P.Exec
p2.Parent = P
p2.Call()
return
case 1:
remaining--
if remaining > 0 {
return
}
P.Next = -1
if !p1.Done() || !p2.Done() {
log.Fatalf("async.Await2: not done: %+v, %+v\n", p1, p2)
}
P.Return(tuple.Tuple3[OutT1, OutT2, error]{
V1: p1.value,
V2: p2.value,
V3: nil,
})
return
default:
panic("unreachable")
}
}
return P
}
// llgo:link Await2 llgo.await
func Await3Compiled[OutT1, OutT2, OutT3 any](
ac1 AsyncCall[OutT1], ac2 AsyncCall[OutT2], ac3 AsyncCall[OutT3],
timeout ...time.Duration) *PromiseImpl[tuple.Tuple3[OutT1, OutT2, OutT3]] {
p1 := ac1.(*PromiseImpl[OutT1])
p2 := ac2.(*PromiseImpl[OutT2])
p3 := ac3.(*PromiseImpl[OutT3])
remaining := 3
P := &PromiseImpl[tuple.Tuple3[OutT1, OutT2, OutT3]]{}
P.Debug = "Await3"
P.Func = func() {
switch P.Next {
case 0:
P.Next = 1
p1.Exec = P.Exec
p1.Parent = P
p1.Call()
p2.Exec = P.Exec
p2.Parent = P
p2.Call()
p3.Exec = P.Exec
p3.Parent = P
p3.Call()
return
case 1:
remaining--
if remaining > 0 {
return
}
P.Next = -1
// TODO(lijie): return every error?
if !p1.Done() || !p2.Done() || !p3.Done() {
log.Fatalf("async.Await3: not done: %+v, %+v, %+v\n", p1, p2, p3)
}
P.Return(tuple.Tuple3[OutT1, OutT2, OutT3]{
V1: p1.value,
V2: p2.value,
V3: p3.value,
})
return
default:
panic("unreachable")
}
}
return P
}
func PAllCompiled[OutT any](acs ...AsyncCall[OutT]) *PromiseImpl[[]OutT] {
P := &PromiseImpl[[]OutT]{}
P.Debug = "Parallel"
P.Func = func() {
ret := make([]OutT, len(acs))
wg := sync.WaitGroup{}
for idx, ac := range acs {
idx := idx
ac := ac
wg.Add(1)
go func(ac AsyncCall[OutT]) {
v := RunImpl[OutT](ac)
ret[idx] = v
wg.Done()
}(ac)
}
wg.Wait()
P.Return(ret)
}
return P
}
func PAwait3Compiled[OutT1, OutT2, OutT3 any](
ac1 AsyncCall[OutT1], ac2 AsyncCall[OutT2], ac3 AsyncCall[OutT3]) *PromiseImpl[tuple.Tuple4[OutT1, OutT2, OutT3, error]] {
P := &PromiseImpl[tuple.Tuple4[OutT1, OutT2, OutT3, error]]{}
P.Debug = "PAwait3"
P.Func = func() {
ret := tuple.Tuple4[OutT1, OutT2, OutT3, error]{}
wg := sync.WaitGroup{}
wg.Add(3)
go func() {
ret.V1 = RunImpl[OutT1](ac1)
wg.Done()
}()
go func() {
ret.V2 = RunImpl[OutT2](ac2)
wg.Done()
}()
go func() {
ret.V3 = RunImpl[OutT3](ac3)
wg.Done()
}()
wg.Wait()
P.Return(ret)
}
return P
}

154
x/async/naive/naive.go Normal file
View File

@@ -0,0 +1,154 @@
/*
* Copyright (c) 2024 The GoPlus Authors (goplus.org). All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package naive
import (
"log"
"sync"
)
const debugAsync = false
// -----------------------------------------------------------------------------
type asyncCall interface {
parent() asyncCall
Resume()
Call()
Done() bool
}
type AsyncCall[OutT any] interface {
Resume()
}
type executor struct {
acs []asyncCall
mu sync.Mutex
cond *sync.Cond
}
func newExecutor() *executor {
e := &executor{}
e.cond = sync.NewCond(&e.mu)
return e
}
func (e *executor) schedule(ac asyncCall) {
e.mu.Lock()
e.acs = append(e.acs, ac)
e.mu.Unlock()
e.cond.Signal()
}
func RunImpl[OutT any](ac AsyncCall[OutT]) OutT {
e := newExecutor()
p := ac.(*PromiseImpl[OutT])
p.Exec = e
var rootAc asyncCall = p
e.schedule(rootAc)
for {
e.mu.Lock()
for len(e.acs) == 0 {
e.cond.Wait()
}
e.mu.Unlock()
ac := e.acs[0]
e.acs = e.acs[1:]
ac.Call()
if ac.Done() && ac == rootAc {
return p.value
}
}
}
// -----------------------------------------------------------------------------
type PromiseImpl[TOut any] struct {
Debug string
Next int
Exec *executor
Parent asyncCall
Func func()
value TOut
c chan TOut
}
func (p *PromiseImpl[TOut]) parent() asyncCall {
return p.Parent
}
func (p *PromiseImpl[TOut]) Resume() {
if debugAsync {
log.Printf("Resume task: %+v\n", p)
}
p.Exec.schedule(p)
}
func (p *PromiseImpl[TOut]) Done() bool {
return p.Next == -1
}
func (p *PromiseImpl[TOut]) Call() {
p.Func()
}
func (p *PromiseImpl[TOut]) Suspend() {
}
func (p *PromiseImpl[TOut]) Return(v TOut) {
// TODO(lijie): panic if already resolved
p.value = v
if p.c != nil {
p.c <- v
}
if debugAsync {
log.Printf("Return task: %+v\n", p)
}
if p.Parent != nil {
p.Parent.Resume()
}
}
func (p *PromiseImpl[TOut]) Yield(v TOut) {
p.value = v
if debugAsync {
log.Printf("Yield task: %+v\n", p)
}
if p.Parent != nil {
p.Parent.Resume()
}
}
func (p *PromiseImpl[TOut]) Value() TOut {
return p.value
}
func (p *PromiseImpl[TOut]) Chan() <-chan TOut {
if p.c == nil {
p.c = make(chan TOut, 1)
p.Func()
}
return p.c
}
func (p *PromiseImpl[TOut]) Await() (ret TOut) {
panic("should not called")
}