From a578155dcb0e42feab80a996f9525649783df07e Mon Sep 17 00:00:00 2001 From: Li Jie Date: Sun, 28 Jul 2024 17:56:30 +0800 Subject: [PATCH] asyncio: multi return types promise/generator --- x/io/_demo/asyncdemo/async.go | 465 ++++++++++++++++++---------------- x/io/extra.go | 162 ++++-------- x/io/io.go | 114 +++++---- 3 files changed, 371 insertions(+), 370 deletions(-) diff --git a/x/io/_demo/asyncdemo/async.go b/x/io/_demo/asyncdemo/async.go index 48483ce6..4cce61c4 100644 --- a/x/io/_demo/asyncdemo/async.go +++ b/x/io/_demo/asyncdemo/async.go @@ -12,35 +12,7 @@ import ( // ----------------------------------------------------------------------------- -type Response struct { - StatusCode int - - Body string -} - -func (r *Response) Text() (resolve io.Promise[string]) { - // return r.Body, nil - resolve(r.Body, nil) - return -} - -func (r *Response) TextCompiled() *io.PromiseImpl[string] { - P := &io.PromiseImpl[string]{} - P.Debug = "Text" - P.Func = func(resolve func(string, error)) { - switch P.Next { - case 0: - P.Next = -1 - resolve(r.Body, nil) - return - default: - panic("Promise already done") - } - } - return P -} - -func Http(method string, url string, callback func(resp *Response, err error)) { +func http(method string, url string, callback func(resp *Response, err error)) { go func() { body := "" if strings.HasPrefix(url, "http://example.com/user/") { @@ -55,40 +27,86 @@ func Http(method string, url string, callback func(resp *Response, err error)) { }() } -func AsyncHttpGet(url string) (resolve io.Promise[*Response]) { - Http("GET", url, resolve) - return +// ----------------------------------------------------------------------------- + +type Response struct { + StatusCode int + + Body string } -func AsyncHttpGetCompiled(url string) *io.PromiseImpl[*Response] { - P := &io.PromiseImpl[*Response]{} - P.Debug = "HttpGet" - P.Func = func(resolve func(*Response, error)) { - switch P.Next { +func (r *Response) Text() *io.Promise[io.R2[string, error]] { + co := &io.Promise[io.R2[string, error]]{} + co.Func = func() { + co.Return(io.R2[string, error]{V1: r.Body, V2: nil}) + } + return co +} + +func (r *Response) TextCompiled() *io.Promise[io.R2[string, error]] { + co := &io.Promise[io.R2[string, error]]{} + co.Debug = "Text" + co.Func = func() { + switch co.Next { case 0: - P.Next = -1 - Http("GET", url, resolve) + co.Next = -1 + co.Return(io.R2[string, error]{V1: r.Body, V2: nil}) return default: panic("Promise already done") } } - return P + return co } -func AsyncHttpPost(url string) (resolve io.Promise[*Response]) { - Http("POST", url, resolve) - return +func AsyncHttpGet(url string) *io.Promise[io.R2[*Response, error]] { + co := &io.Promise[io.R2[*Response, error]]{} + co.Func = func() { + http("GET", url, func(resp *Response, err error) { + co.Return(io.R2[*Response, error]{V1: resp, V2: nil}) + }) + } + return co } -func AsyncHttpPostCompiled(url string) *io.PromiseImpl[*Response] { - P := &io.PromiseImpl[*Response]{} +func AsyncHttpGetCompiled(url string) *io.Promise[io.R2[*Response, error]] { + co := &io.Promise[io.R2[*Response, error]]{} + co.Debug = "HttpGet" + co.Func = func() { + switch co.Next { + case 0: + co.Next = -1 + http("GET", url, func(resp *Response, err error) { + co.Return(io.R2[*Response, error]{V1: resp, V2: nil}) + }) + return + default: + panic("Promise already done") + } + } + return co +} + +func AsyncHttpPost(url string) *io.Promise[io.R2[*Response, error]] { + co := &io.Promise[io.R2[*Response, error]]{} + co.Func = func() { + http("POST", url, func(resp *Response, err error) { + co.Return(io.R2[*Response, error]{V1: resp, V2: nil}) + }) + } + return co +} + +func AsyncHttpPostCompiled(url string) *io.Promise[io.R2[*Response, error]] { + P := &io.Promise[io.R2[*Response, error]]{} P.Debug = "HttpPost" - P.Func = func(resolve func(*Response, error)) { + P.Func = func() { switch P.Next { case 0: P.Next = -1 - Http("POST", url, resolve) + http("POST", url, func(resp *Response, err error) { + P.Return(io.R2[*Response, error]{V1: resp, V2: nil}) + }) return default: panic("Promise already done") @@ -103,351 +121,365 @@ type User struct { Name string } -func GetUser(name string) (resolve io.Promise[User]) { - resp, err := AsyncHttpGet("http://example.com/user/" + name).Await() +func GetUser(name string) (co *io.Promise[io.R2[User, error]]) { + resp, err := AsyncHttpGet("http://example.com/user/" + name).Await().Values() if err != nil { // return User{}, err - resolve(User{}, err) + co.Return(io.R2[User, error]{V1: User{}, V2: err}) return } if resp.StatusCode != 200 { // return User{}, fmt.Errorf("http status code: %d", resp.StatusCode) - resolve(User{}, fmt.Errorf("http status code: %d", resp.StatusCode)) + co.Return(io.R2[User, error]{V1: User{}, V2: fmt.Errorf("http status code: %d", resp.StatusCode)}) return } - body, err := resp.Text().Await() + body, err := resp.Text().Await().Values() if err != nil { // return User{}, err - resolve(User{}, err) + co.Return(io.R2[User, error]{V1: User{}, V2: err}) return } user := User{} if err := json.Unmarshal([]byte(body), &user); err != nil { // return User{}, err - resolve(User{}, err) + co.Return(io.R2[User, error]{V1: User{}, V2: err}) return } // return user, nil - resolve(user, nil) + co.Return(io.R2[User, error]{V1: user, V2: nil}) return } -func GetUserCompiled(name string) *io.PromiseImpl[User] { - var state1 *io.PromiseImpl[*Response] - var state2 *io.PromiseImpl[string] +func GetUserCompiled(name string) (co *io.Promise[io.R2[User, error]]) { + var state1 *io.Promise[io.R2[*Response, error]] + var state2 *io.Promise[io.R2[string, error]] - P := &io.PromiseImpl[User]{} - P.Debug = "GetUser" - P.Func = func(resolve func(User, error)) { - switch P.Next { + co = &io.Promise[io.R2[User, error]]{} + co.Debug = "GetUser" + co.Func = func() { + switch co.Next { case 0: - P.Next = 1 + co.Next = 1 state1 = AsyncHttpGetCompiled("http://example.com/user/" + name) - state1.Exec = P.Exec - state1.Parent = P + state1.Exec = co.Exec + state1.Parent = co state1.Call() return case 1: - P.Next = 2 - resp, err := state1.Value(), state1.Err() + co.Next = 2 + resp, err := state1.Value().Values() log.Printf("resp: %v, err: %v\n", resp, err) if err != nil { - resolve(User{}, err) + co.Return(io.R2[User, error]{V1: User{}, V2: err}) return } if resp.StatusCode != 200 { - resolve(User{}, fmt.Errorf("http status code: %d", resp.StatusCode)) + co.Return(io.R2[User, error]{V1: User{}, V2: fmt.Errorf("http status code: %d", resp.StatusCode)}) return } state2 = resp.TextCompiled() - state2.Exec = P.Exec - state2.Parent = P + state2.Exec = co.Exec + state2.Parent = co state2.Call() return case 2: - P.Next = -1 - body, err := state2.Value(), state2.Err() + co.Next = -1 + body, err := state2.Value().Values() if err != nil { - resolve(User{}, err) + co.Return(io.R2[User, error]{V1: User{}, V2: err}) return } user := User{} log.Printf("body: %v\n", body) if err := json.Unmarshal([]byte(body), &user); err != nil { - resolve(User{}, err) + co.Return(io.R2[User, error]{V1: User{}, V2: err}) return } log.Printf("resolve user: %+v\n", user) - resolve(user, nil) + co.Return(io.R2[User, error]{V1: user, V2: nil}) return default: - panic(fmt.Sprintf("Promise already done, %+v", P)) + panic(fmt.Errorf("Promise already done, %+v", co)) } } - return P + return } -func GetScore() (resolve io.Promise[float64]) { - resp, err := AsyncHttpGet("http://example.com/score/").Await() +func GetScore() (co *io.Promise[io.R2[float64, error]]) { + resp, err := AsyncHttpGet("http://example.com/score/").Await().Values() if err != nil { - // return 0, err - resolve(0, err) + co.Return(io.R2[float64, error]{V1: 0, V2: err}) return } if resp.StatusCode != 200 { // return 0, fmt.Errorf("http status code: %d", resp.StatusCode) - resolve(0, fmt.Errorf("http status code: %d", resp.StatusCode)) + co.Return(io.R2[float64, error]{V1: 0, V2: fmt.Errorf("http status code: %d", resp.StatusCode)}) return } - body, err := resp.Text().Await() + body, err := resp.Text().Await().Values() if err != nil { // return 0, err - resolve(0, err) + co.Return(io.R2[float64, error]{V1: 0, V2: err}) return } score := 0.0 if _, err := fmt.Sscanf(body, "%f", &score); err != nil { // return 0, err - resolve(0, err) + co.Return(io.R2[float64, error]{V1: 0, V2: err}) return } // return score, nil - resolve(score, nil) + co.Return(io.R2[float64, error]{V1: score, V2: nil}) return } -func GetScoreCompiled() *io.PromiseImpl[float64] { - var state1 *io.PromiseImpl[*Response] - var state2 *io.PromiseImpl[string] +func GetScoreCompiled() *io.Promise[io.R2[float64, error]] { + var state1 *io.Promise[io.R2[*Response, error]] + var state2 *io.Promise[io.R2[string, error]] - P := &io.PromiseImpl[float64]{} - P.Debug = "GetScore" - P.Func = func(resolve func(float64, error)) { - switch P.Next { + co := &io.Promise[io.R2[float64, error]]{} + co.Debug = "GetScore" + co.Func = func() { + switch co.Next { case 0: - P.Next = 1 + co.Next = 1 state1 = AsyncHttpGetCompiled("http://example.com/score/") - state1.Exec = P.Exec - state1.Parent = P + state1.Exec = co.Exec + state1.Parent = co state1.Call() return case 1: - P.Next = 2 - resp, err := state1.Value(), state1.Err() + co.Next = 2 + + resp, err := state1.Value().Values() if err != nil { - resolve(0, err) + co.Return(io.R2[float64, error]{V1: 0, V2: err}) return } if resp.StatusCode != 200 { - resolve(0, fmt.Errorf("http status code: %d", resp.StatusCode)) + co.Return(io.R2[float64, error]{V1: 0, V2: fmt.Errorf("http status code: %d", resp.StatusCode)}) return } state2 = resp.TextCompiled() - state2.Exec = P.Exec - state2.Parent = P + state2.Exec = co.Exec + state2.Parent = co state2.Call() return case 2: - P.Next = -1 - body, err := state2.Value(), state2.Err() + co.Next = -1 + body, err := state2.Value().Values() if err != nil { - resolve(0, err) + co.Return(io.R2[float64, error]{V1: 0, V2: err}) return } score := 0.0 if _, err := fmt.Sscanf(body, "%f", &score); err != nil { - resolve(0, err) + co.Return(io.R2[float64, error]{V1: 0, V2: err}) return } - resolve(score, nil) + co.Return(io.R2[float64, error]{V1: score, V2: nil}) return default: panic("Promise already done") } } - return P + return co } -func DoUpdate(op string) (resolve io.Promise[io.Void]) { - resp, err := AsyncHttpPost("http://example.com/update/" + op).Await() +func DoUpdate(op string) (co *io.Promise[error]) { + resp, err := AsyncHttpPost("http://example.com/update/" + op).Await().Values() if err != nil { - // return err - resolve(io.Void{}, err) + co.Return(err) return } if resp.StatusCode != 200 { - // return fmt.Errorf("http status code: %d", resp.StatusCode) - resolve(io.Void{}, fmt.Errorf("http status code: %d", resp.StatusCode)) - return + co.Return(fmt.Errorf("http status code: %d", resp.StatusCode)) } - // return nil - resolve(io.Void{}, nil) + co.Return(nil) return } -func DoUpdateCompiled(op string) *io.PromiseImpl[io.Void] { - var state1 *io.PromiseImpl[*Response] +func DoUpdateCompiled(op string) *io.Promise[error] { + var state1 *io.Promise[io.R2[*Response, error]] - P := &io.PromiseImpl[io.Void]{} - P.Debug = "DoUpdate" - P.Func = func(resolve func(io.Void, error)) { - switch P.Next { + co := &io.Promise[error]{} + co.Debug = "DoUpdate" + co.Func = func() { + switch co.Next { case 0: - P.Next = 1 + co.Next = 1 state1 = AsyncHttpPostCompiled("http://example.com/update/" + op) - state1.Exec = P.Exec - state1.Parent = P + state1.Exec = co.Exec + state1.Parent = co state1.Call() return case 1: - P.Next = -1 - resp, err := state1.Value(), state1.Err() + co.Next = -1 + resp, err := state1.Value().Values() if err != nil { - resolve(io.Void{}, err) + co.Return(err) return } if resp.StatusCode != 200 { - resolve(io.Void{}, fmt.Errorf("http status code: %d", resp.StatusCode)) + co.Return(fmt.Errorf("http status code: %d", resp.StatusCode)) return } - resolve(io.Void{}, nil) + co.Return(nil) return default: panic("Promise already done") } } - return P + return co } -func GenInts() (yield io.Promise[int]) { - yield(3, nil) - yield(2, nil) - yield(5, nil) +func GenInts() (co *io.Promise[int]) { + co.Yield(3) + co.Yield(2) + co.Yield(5) return } -func GenIntsCompiled() *io.PromiseImpl[int] { - P := &io.PromiseImpl[int]{} - P.Debug = "GenInts" - P.Func = func(resolve func(int, error)) { - switch P.Next { +func GenIntsCompiled() *io.Promise[int] { + co := &io.Promise[int]{} + co.Debug = "GenInts" + co.Func = func() { + switch co.Next { case 0: - P.Next = 1 - resolve(3, nil) + co.Next = 1 + co.Yield(3) return case 1: - P.Next = 2 - resolve(2, nil) + co.Next = 2 + co.Yield(2) return case 2: - P.Next = 3 - resolve(5, nil) + co.Next = 3 + co.Yield(5) return case 3: - P.Next = -1 - resolve(0, fmt.Errorf("stop")) - return + co.Next = -1 default: panic("Generator already done") } } - return P + return co } -func GenUsers() (yield io.Promise[User]) { - u, _ := GetUser("Alice").Await() - yield(u, nil) - u, _ = GetUser("Bob").Await() - yield(u, nil) - u, _ = GetUser("Cindy").Await() - yield(u, nil) +// Generator with async calls and panic +func GenUsers() (co *io.Promise[User]) { + u, err := GetUser("Alice").Await().Values() + if err != nil { + panic(err) + } + co.Yield(u) + u, err = GetUser("Bob").Await().Values() + if err != nil { + panic(err) + } + co.Yield(u) + u, err = GetUser("Cindy").Await().Values() + if err != nil { + panic(err) + } + co.Yield(u) log.Printf("genUsers done\n") return } -func GenUsersCompiled() (resolve *io.PromiseImpl[User]) { - var state1, state2, state3 *io.PromiseImpl[User] +func GenUsersCompiled() (resolve *io.Promise[User]) { + var state1, state2, state3 *io.Promise[io.R2[User, error]] - P := &io.PromiseImpl[User]{} - P.Debug = "GenUsers" - P.Func = func(resolve func(User, error)) { - switch P.Next { + co := &io.Promise[User]{} + co.Debug = "GenUsers" + co.Func = func() { + switch co.Next { case 0: - P.Next = 1 + co.Next = 1 state1 = GetUserCompiled("Alice") - state1.Exec = P.Exec - state1.Parent = P + state1.Exec = co.Exec + state1.Parent = co state1.Call() return case 1: - P.Next = 2 - u, _ := state1.Value(), state1.Err() - resolve(u, nil) + co.Next = 2 + u, err := state1.Value().Values() + if err != nil { + panic(err) + } else { + co.Yield(u) + } return case 2: - P.Next = 3 + co.Next = 3 state2 = GetUserCompiled("Bob") - state2.Exec = P.Exec - state2.Parent = P + state2.Exec = co.Exec + state2.Parent = co state2.Call() return case 3: - P.Next = 4 - u, _ := state2.Value(), state2.Err() - resolve(u, nil) + co.Next = 4 + u, err := state2.Value().Values() + if err != nil { + panic(err) + } else { + co.Yield(u) + } return case 4: - P.Next = 5 + co.Next = 5 state3 = GetUserCompiled("Cindy") - state3.Exec = P.Exec - state3.Parent = P + state3.Exec = co.Exec + state3.Parent = co state3.Call() return case 5: - P.Next = 6 - u, _ := state3.Value(), state3.Err() - resolve(u, nil) + co.Next = 6 + u, err := state3.Value().Values() + if err != nil { + panic(err) + } else { + co.Yield(u) + } return case 6: - P.Next = -1 - resolve(User{}, fmt.Errorf("stop")) - return + co.Next = -1 default: panic("Generator already done") } } - return P + return co } -func Demo() (resolve io.Promise[io.Void]) { - user, err := GetUser("1").Await() +func Demo() { + user, err := GetUser("1").Await().Values() log.Println(user, err) - user, err = io.Race[User](GetUser("2"), GetUser("3"), GetUser("4")).Await() + user, err = io.Race[io.R2[User, error]](GetUser("2"), GetUser("3"), GetUser("4")).Value().Values() log.Println(user, err) - users, err := io.All[User]([]io.AsyncCall[User]{GetUser("5"), GetUser("6"), GetUser("7")}).Await() + users := io.All[io.R2[User, error]]([]io.AsyncCall[io.R2[User, error]]{GetUser("5"), GetUser("6"), GetUser("7")}).Value() log.Println(users, err) - user, score, _, err := io.Await3[User, float64, io.Void](GetUser("8"), GetScore(), DoUpdate("update sth.")) + user, score, _ := io.Await3Compiled[User, float64, io.Void](GetUser("8"), GetScore(), DoUpdate("update sth.")).Value().Values() log.Println(user, score, err) // for loop with generator @@ -481,20 +513,19 @@ func Demo() (resolve io.Promise[io.Void]) { // case <-io.Timeout(5 * time.Second).Chan(): // log.Println("timeout") // } - return } -func DemoCompiled() *io.PromiseImpl[io.Void] { - var state1 *io.PromiseImpl[User] - var state2 *io.PromiseImpl[User] - var state3 *io.PromiseImpl[[]io.Result[User]] - var state4 *io.PromiseImpl[io.Await3Result[User, float64, io.Void]] - var g1 *io.PromiseImpl[int] - var g2 *io.PromiseImpl[User] +func DemoCompiled() *io.Promise[io.Void] { + var state1 *io.Promise[io.R2[User, error]] + var state2 *io.Promise[io.R2[User, error]] + var state3 *io.Promise[[]io.R2[User, error]] + var state4 *io.Promise[io.R3[io.R2[User, error], io.R2[float64, error], error]] + var g1 *io.Promise[int] + var g2 *io.Promise[User] - P := &io.PromiseImpl[io.Void]{} + P := &io.Promise[io.Void]{} P.Debug = "Demo" - P.Func = func(resolve func(io.Void, error)) { + P.Func = func() { switch P.Next { case 0: P.Next = 1 @@ -505,20 +536,20 @@ func DemoCompiled() *io.PromiseImpl[io.Void] { return case 1: P.Next = 2 - user, err := state1.Value(), state1.Err() + user, err := state1.Value().Values() log.Printf("user: %+v, err: %v\n", user, err) - state2 = io.Race[User](GetUserCompiled("2"), GetUserCompiled("3"), GetUserCompiled("4")) + state2 = io.Race[io.R2[User, error]](GetUserCompiled("2"), GetUserCompiled("3"), GetUserCompiled("4")) state2.Exec = P.Exec state2.Parent = P state2.Call() return case 2: P.Next = 3 - user, err := state2.Value(), state2.Err() + user, err := state2.Value().Values() log.Printf("race user: %+v, err: %v\n", user, err) - state3 = io.All[User]([]io.AsyncCall[User]{GetUserCompiled("5"), GetUserCompiled("6"), GetUserCompiled("7")}) + state3 = io.All[io.R2[User, error]]([]io.AsyncCall[io.R2[User, error]]{GetUserCompiled("5"), GetUserCompiled("6"), GetUserCompiled("7")}) state3.Exec = P.Exec state3.Parent = P state3.Call() @@ -526,18 +557,18 @@ func DemoCompiled() *io.PromiseImpl[io.Void] { case 3: P.Next = 4 - users, err := state3.Value(), state3.Err() - log.Println(users, err) + users := state3.Value() + log.Println(users) - state4 = io.Await3Compiled[User, float64, io.Void](GetUserCompiled("8"), GetScoreCompiled(), DoUpdateCompiled("update sth.")) + state4 = io.Await3Compiled[io.R2[User, error], io.R2[float64, error], error](GetUserCompiled("8"), GetScoreCompiled(), DoUpdateCompiled("update sth.")) state4.Exec = P.Exec state4.Parent = P state4.Call() return case 4: P.Next = 5 - user, score, _, err := state4.Value().V1, state4.Value().V2, state4.Value().V3, state4.Value().Err - log.Println(user, score, err) + user, score, _ := state4.Value().Values() + log.Println(user, score) g1 = GenIntsCompiled() for { @@ -555,13 +586,15 @@ func DemoCompiled() *io.PromiseImpl[io.Void] { g2.Call() return case 5: - if g2.Err() != nil { + g2.Call() + if g2.Done() { P.Next = -1 - resolve(io.Void{}, nil) + log.Printf("Demo done\n") + P.Return(io.Void{}) return } log.Printf("genUser: %+v, done: %v\n", g2.Value(), g2.Done()) - g2.Call() + return default: panic("Promise already done") } @@ -572,6 +605,6 @@ func DemoCompiled() *io.PromiseImpl[io.Void] { func main() { log.SetFlags(log.Lshortfile | log.LstdFlags) // io.Run(Demo()) - v, err := io.Run[io.Void](DemoCompiled()) - log.Println(v, err) + v := io.Run[io.Void](DemoCompiled()) + log.Println(v) } diff --git a/x/io/extra.go b/x/io/extra.go index 7ea8df75..802a53b1 100644 --- a/x/io/extra.go +++ b/x/io/extra.go @@ -33,13 +33,13 @@ func Await[OutT any](call AsyncCall[OutT], timeout ...time.Duration) (ret OutT, //go:linkname Timeout llgo.timeout func Timeout(time.Duration) (ret AsyncCall[Void]) -func TimeoutCompiled(d time.Duration) *PromiseImpl[Void] { - P := &PromiseImpl[Void]{} +func TimeoutCompiled(d time.Duration) *Promise[Void] { + P := &Promise[Void]{} P.Debug = "Timeout" - P.Func = func(resolve func(Void, error)) { + P.Func = func() { go func() { time.Sleep(d) - resolve(Void{}, nil) + P.Return(Void{}) }() } return P @@ -50,20 +50,19 @@ type Result[T any] struct { Err error } -// llgo:link Race llgo.race -func Race[OutT any](acs ...AsyncCall[OutT]) *PromiseImpl[OutT] { +func Race[OutT any](acs ...AsyncCall[OutT]) *Promise[OutT] { if len(acs) == 0 { - panic("face: no promise") + panic("race: no promise") } - ps := make([]*PromiseImpl[OutT], len(acs)) + ps := make([]*Promise[OutT], len(acs)) for idx, ac := range acs { - ps[idx] = ac.(*PromiseImpl[OutT]) + ps[idx] = ac.(*Promise[OutT]) } remaining := len(acs) returned := false - P := &PromiseImpl[OutT]{} + P := &Promise[OutT]{} P.Debug = "Race" - P.Func = func(resolve func(OutT, error)) { + P.Func = func() { switch P.Next { case 0: P.Next = 1 @@ -88,7 +87,7 @@ func Race[OutT any](acs ...AsyncCall[OutT]) *PromiseImpl[OutT] { log.Printf("io.Race done: %+v won the race\n", p) } returned = true - resolve(p.value, p.err) + P.Return(p.value) return } } @@ -101,15 +100,15 @@ func Race[OutT any](acs ...AsyncCall[OutT]) *PromiseImpl[OutT] { return P } -func All[OutT any](acs []AsyncCall[OutT]) *PromiseImpl[[]Result[OutT]] { - ps := make([]*PromiseImpl[OutT], len(acs)) +func All[OutT any](acs []AsyncCall[OutT]) *Promise[[]OutT] { + ps := make([]*Promise[OutT], len(acs)) for idx, ac := range acs { - ps[idx] = ac.(*PromiseImpl[OutT]) + ps[idx] = ac.(*Promise[OutT]) } done := 0 - P := &PromiseImpl[[]Result[OutT]]{} + P := &Promise[[]OutT]{} P.Debug = "All" - P.Func = func(resolve func([]Result[OutT], error)) { + P.Func = func() { switch P.Next { case 0: P.Next = 1 @@ -132,14 +131,14 @@ func All[OutT any](acs []AsyncCall[OutT]) *PromiseImpl[[]Result[OutT]] { } } - ret := make([]Result[OutT], len(acs)) + ret := make([]OutT, len(acs)) for idx, p := range ps { - ret[idx] = Result[OutT]{p.value, p.err} + ret[idx] = p.value } if debugAsync { log.Printf("io.All done: %+v\n", ret) } - resolve(ret, nil) + P.Return(ret) return default: panic("unreachable") @@ -149,27 +148,15 @@ func All[OutT any](acs []AsyncCall[OutT]) *PromiseImpl[[]Result[OutT]] { } // llgo:link Await2 llgo.await -func Await2[OutT1, OutT2 any]( - ac1 AsyncCall[OutT1], ac2 AsyncCall[OutT2], - timeout ...time.Duration) (ret1 OutT1, ret2 OutT2, err error) { - return -} - -type Await2Result[T1 any, T2 any] struct { - V1 T1 - V2 T2 - Err error -} - func Await2Compiled[OutT1, OutT2 any]( ac1 AsyncCall[OutT1], ac2 AsyncCall[OutT2], - timeout ...time.Duration) (ret *PromiseImpl[Await2Result[OutT1, OutT2]]) { - p1 := ac1.(*PromiseImpl[OutT1]) - p2 := ac2.(*PromiseImpl[OutT2]) + timeout ...time.Duration) (ret *Promise[R3[OutT1, OutT2, error]]) { + p1 := ac1.(*Promise[OutT1]) + p2 := ac2.(*Promise[OutT2]) remaining := 2 - P := &PromiseImpl[Await2Result[OutT1, OutT2]]{} + P := &Promise[R3[OutT1, OutT2, error]]{} P.Debug = "Await2" - P.Func = func(resolve func(Await2Result[OutT1, OutT2], error)) { + P.Func = func() { switch P.Next { case 0: P.Next = 1 @@ -191,17 +178,11 @@ func Await2Compiled[OutT1, OutT2 any]( log.Fatalf("io.Await2: not done: %+v, %+v\n", p1, p2) } - var err error - if p1.err != nil { - err = p1.err - } else if p2.err != nil { - err = p2.err - } - - resolve(Await2Result[OutT1, OutT2]{ - V1: p1.value, V2: p2.value, - Err: err, - }, err) + P.Return(R3[OutT1, OutT2, error]{ + V1: p1.value, + V2: p2.value, + V3: nil, + }) return default: panic("unreachable") @@ -210,30 +191,17 @@ func Await2Compiled[OutT1, OutT2 any]( return P } -// llgo:link Await3 llgo.await -func Await3[OutT1, OutT2, OutT3 any]( - ac1 AsyncCall[OutT1], ac2 AsyncCall[OutT2], ac3 AsyncCall[OutT3], - timeout ...time.Duration) (ret1 OutT1, ret2 OutT2, ret3 OutT3, err error) { - return -} - -type Await3Result[T1 any, T2 any, T3 any] struct { - V1 T1 - V2 T2 - V3 T3 - Err error -} - +// llgo:link Await2 llgo.await func Await3Compiled[OutT1, OutT2, OutT3 any]( ac1 AsyncCall[OutT1], ac2 AsyncCall[OutT2], ac3 AsyncCall[OutT3], - timeout ...time.Duration) *PromiseImpl[Await3Result[OutT1, OutT2, OutT3]] { - p1 := ac1.(*PromiseImpl[OutT1]) - p2 := ac2.(*PromiseImpl[OutT2]) - p3 := ac3.(*PromiseImpl[OutT3]) + timeout ...time.Duration) *Promise[R3[OutT1, OutT2, OutT3]] { + p1 := ac1.(*Promise[OutT1]) + p2 := ac2.(*Promise[OutT2]) + p3 := ac3.(*Promise[OutT3]) remaining := 3 - P := &PromiseImpl[Await3Result[OutT1, OutT2, OutT3]]{} + P := &Promise[R3[OutT1, OutT2, OutT3]]{} P.Debug = "Await3" - P.Func = func(resolve func(Await3Result[OutT1, OutT2, OutT3], error)) { + P.Func = func() { switch P.Next { case 0: P.Next = 1 @@ -260,19 +228,11 @@ func Await3Compiled[OutT1, OutT2, OutT3 any]( log.Fatalf("io.Await3: not done: %+v, %+v, %+v\n", p1, p2, p3) } - var err error - if p1.err != nil { - err = p1.err - } else if p2.err != nil { - err = p2.err - } else if p3.err != nil { - err = p3.err - } - - resolve(Await3Result[OutT1, OutT2, OutT3]{ - V1: p1.value, V2: p2.value, V3: p3.value, - Err: err, - }, err) + P.Return(R3[OutT1, OutT2, OutT3]{ + V1: p1.value, + V2: p2.value, + V3: p3.value, + }) return default: panic("unreachable") @@ -281,60 +241,50 @@ func Await3Compiled[OutT1, OutT2, OutT3 any]( return P } -// / PAll is a parallel version of All. -func PAll[OutT any](acs ...AsyncCall[OutT]) (resolve Promise[[]Result[OutT]]) { - panic("todo: PAll") -} - -func PAllCompiled[OutT any](acs ...AsyncCall[OutT]) *PromiseImpl[[]Result[OutT]] { - P := &PromiseImpl[[]Result[OutT]]{} +func PAllCompiled[OutT any](acs ...AsyncCall[OutT]) *Promise[[]OutT] { + P := &Promise[[]OutT]{} P.Debug = "Parallel" - P.Func = func(resolve func([]Result[OutT], error)) { - ret := make([]Result[OutT], len(acs)) + P.Func = func() { + ret := make([]OutT, len(acs)) wg := sync.WaitGroup{} for idx, ac := range acs { idx := idx ac := ac wg.Add(1) go func(ac AsyncCall[OutT]) { - v, err := Run[OutT](ac) - ret[idx] = Result[OutT]{v, err} + v := Run[OutT](ac) + ret[idx] = v wg.Done() }(ac) } wg.Wait() - resolve(ret, nil) + P.Return(ret) } return P } -// / PAwait3 is a parallel version of Await3. -func PAwait3[OutT1, OutT2, OutT3 any](ac1 AsyncCall[OutT1], ac2 AsyncCall[OutT2], ac3 AsyncCall[OutT3]) (resolve Promise[Await3Result[OutT1, OutT2, OutT3]]) { - panic("todo: PAwait2") -} - func PAwait3Compiled[OutT1, OutT2, OutT3 any]( - ac1 AsyncCall[OutT1], ac2 AsyncCall[OutT2], ac3 AsyncCall[OutT3]) *PromiseImpl[Await3Result[OutT1, OutT2, OutT3]] { - P := &PromiseImpl[Await3Result[OutT1, OutT2, OutT3]]{} + ac1 AsyncCall[OutT1], ac2 AsyncCall[OutT2], ac3 AsyncCall[OutT3]) *Promise[R4[OutT1, OutT2, OutT3, error]] { + P := &Promise[R4[OutT1, OutT2, OutT3, error]]{} P.Debug = "PAwait3" - P.Func = func(resolve func(Await3Result[OutT1, OutT2, OutT3], error)) { - ret := Await3Result[OutT1, OutT2, OutT3]{} + P.Func = func() { + ret := R4[OutT1, OutT2, OutT3, error]{} wg := sync.WaitGroup{} wg.Add(3) go func() { - ret.V1, ret.Err = Run[OutT1](ac1) + ret.V1 = Run[OutT1](ac1) wg.Done() }() go func() { - ret.V2, ret.Err = Run[OutT2](ac2) + ret.V2 = Run[OutT2](ac2) wg.Done() }() go func() { - ret.V3, ret.Err = Run[OutT3](ac3) + ret.V3 = Run[OutT3](ac3) wg.Done() }() wg.Wait() - resolve(ret, nil) + P.Return(ret) } return P } diff --git a/x/io/io.go b/x/io/io.go index b82c4fcf..16ae959f 100644 --- a/x/io/io.go +++ b/x/io/io.go @@ -20,8 +20,6 @@ import ( "log" "sync" _ "unsafe" - - "time" ) const ( @@ -42,6 +40,7 @@ type asyncCall interface { } type AsyncCall[OutT any] interface { + Resume() } type executor struct { @@ -63,9 +62,9 @@ func (e *executor) schedule(ac asyncCall) { e.cond.Signal() } -func Run[OutT any](ac AsyncCall[OutT]) (OutT, error) { +func Run[OutT any](ac AsyncCall[OutT]) OutT { e := newExecutor() - p := ac.(*PromiseImpl[OutT]) + p := ac.(*Promise[OutT]) p.Exec = e var rootAc asyncCall = p e.schedule(rootAc) @@ -80,102 +79,121 @@ func Run[OutT any](ac AsyncCall[OutT]) (OutT, error) { e.acs = e.acs[1:] ac.Call() if ac.Done() && ac == rootAc { - return p.value, p.err + return p.value } } } // ----------------------------------------------------------------------------- -type Promise[OutT any] func(OutT, error) - -// llgo:link Promise.Await llgo.await -func (p Promise[OutT]) Await(timeout ...time.Duration) (ret OutT, err error) { - panic("should not called") +type R1[T any] struct { + V1 T } -func (p Promise[OutT]) Call() { - panic("should not called") +func (r R1[T]) Values() T { + return r.V1 } -func (p Promise[OutT]) Chan() <-chan OutT { - panic("should not called") +type R2[T1 any, T2 any] struct { + V1 T1 + V2 T2 } -func (p Promise[OutT]) Done() bool { - panic("should not called") +func (r R2[T1, T2]) Values() (T1, T2) { + return r.V1, r.V2 } -func (p Promise[OutT]) Err() error { - panic("should not called") +type R3[T1 any, T2 any, T3 any] struct { + V1 T1 + V2 T2 + V3 T3 } -func (p Promise[OutT]) Value() OutT { - panic("should not called") +func (r R3[T1, T2, T3]) Values() (T1, T2, T3) { + return r.V1, r.V2, r.V3 } -// ----------------------------------------------------------------------------- +type R4[T1 any, T2 any, T3 any, T4 any] struct { + V1 T1 + V2 T2 + V3 T3 + V4 T4 +} -type PromiseImpl[TOut any] struct { +func (r R4[T1, T2, T3, T4]) Values() (T1, T2, T3, T4) { + return r.V1, r.V2, r.V3, r.V4 +} + +type Promise[TOut any] struct { Debug string Next int Exec *executor Parent asyncCall - Func func(resolve func(TOut, error)) - err error + Func func() value TOut c chan TOut } -func (p *PromiseImpl[TOut]) parent() asyncCall { +func NewPromise[TOut any](fn func()) *Promise[TOut] { + return &Promise[TOut]{Func: fn} +} + +func (p *Promise[TOut]) parent() asyncCall { return p.Parent } -func (p *PromiseImpl[TOut]) Resume() { +func (p *Promise[TOut]) Resume() { if debugAsync { log.Printf("Resume task: %+v\n", p) } p.Exec.schedule(p) } -func (p *PromiseImpl[TOut]) Done() bool { +func (p *Promise[TOut]) Done() bool { return p.Next == -1 } -func (p *PromiseImpl[TOut]) Call() { - p.Func(func(v TOut, err error) { - p.value = v - p.err = err - if debugAsync { - log.Printf("Resolve task: %+v, %+v, %+v\n", p, v, err) - } - if p.Parent != nil { - p.Parent.Resume() - } - }) +func (p *Promise[TOut]) Call() { + p.Func() } -func (p *PromiseImpl[TOut]) Err() error { - return p.err +func (p *Promise[TOut]) Return(v TOut) { + // TODO(lijie): panic if already resolved + p.value = v + if p.c != nil { + p.c <- v + } + if debugAsync { + log.Printf("Return task: %+v\n", p) + } + if p.Parent != nil { + p.Parent.Resume() + } } -func (p *PromiseImpl[TOut]) Value() TOut { +func (p *Promise[TOut]) Yield(v TOut) { + p.value = v + if debugAsync { + log.Printf("Yield task: %+v\n", p) + } + if p.Parent != nil { + p.Parent.Resume() + } +} + +func (p *Promise[TOut]) Value() TOut { return p.value } -func (p *PromiseImpl[TOut]) Chan() <-chan TOut { +func (p *Promise[TOut]) Chan() <-chan TOut { if p.c == nil { p.c = make(chan TOut, 1) - p.Func(func(v TOut, err error) { - p.value = v - p.err = err - p.c <- v - }) + p.Func() } return p.c } -func (p *PromiseImpl[TOut]) Await(timeout ...time.Duration) (ret TOut, err error) { +func (p *Promise[TOut]) Await() (ret TOut) { panic("should not called") }