diff --git a/x/io/_demo/asyncdemo/async.go b/x/io/_demo/asyncdemo/async.go index 02b4475a..519f74fb 100644 --- a/x/io/_demo/asyncdemo/async.go +++ b/x/io/_demo/asyncdemo/async.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" "log" + "strings" "time" "github.com/goplus/llgo/x/io" @@ -28,12 +29,13 @@ func (r *Response) Text() (resolve io.Promise[string]) { func (r *Response) TextCompiled() *io.PromiseImpl[string] { P := &io.PromiseImpl[string]{} + P.Debug = "Text" P.Func = func(resolve func(string, error)) { for { switch P.Prev = P.Next; P.Prev { case 0: - resolve(r.mockBody, nil) P.Next = -1 + resolve(r.mockBody, nil) return default: panic("Promise already done") @@ -43,24 +45,58 @@ func (r *Response) TextCompiled() *io.PromiseImpl[string] { return P } -func HttpGet(url string, callback func(resp *Response, err error)) { - resp := &Response{StatusCode: 200} - callback(resp, nil) +func Http(method string, url string, callback func(resp *Response, err error)) { + go func() { + body := "" + if strings.HasPrefix(url, "http://example.com/user/") { + name := url[len("http://example.com/user/"):] + body = `{"name":"` + name + `"}` + } else if strings.HasPrefix(url, "http://example.com/score/") { + body = "99.5" + } + time.Sleep(200 * time.Millisecond) + resp := &Response{StatusCode: 200, mockBody: body} + callback(resp, nil) + }() } func AsyncHttpGet(url string) (resolve io.Promise[*Response]) { - HttpGet(url, resolve) + Http("GET", url, resolve) return } func AsyncHttpGetCompiled(url string) *io.PromiseImpl[*Response] { P := &io.PromiseImpl[*Response]{} + P.Debug = "HttpGet" P.Func = func(resolve func(*Response, error)) { for { switch P.Prev = P.Next; P.Prev { case 0: - HttpGet(url, resolve) P.Next = -1 + Http("GET", url, resolve) + return + default: + panic("Promise already done") + } + } + } + return P +} + +func AsyncHttpPost(url string) (resolve io.Promise[*Response]) { + Http("POST", url, resolve) + return +} + +func AsyncHttpPostCompiled(url string) *io.PromiseImpl[*Response] { + P := &io.PromiseImpl[*Response]{} + P.Debug = "HttpPost" + P.Func = func(resolve func(*Response, error)) { + for { + switch P.Prev = P.Next; P.Prev { + case 0: + P.Next = -1 + Http("POST", url, resolve) return default: panic("Promise already done") @@ -76,8 +112,8 @@ type User struct { Name string } -func GetUser(uid string) (resolve io.Promise[User]) { - resp, err := AsyncHttpGet("http://example.com/user/" + uid).Await() +func GetUser(name string) (resolve io.Promise[User]) { + resp, err := AsyncHttpGet("http://example.com/user/" + name).Await() if err != nil { resolve(User{}, err) return @@ -88,8 +124,6 @@ func GetUser(uid string) (resolve io.Promise[User]) { return } - resp.mock(`{"name":"Alice"}`) - body, err := resp.Text().Await() if err != nil { resolve(User{}, err) @@ -105,21 +139,29 @@ func GetUser(uid string) (resolve io.Promise[User]) { return } -func GetUserCompiled(uid string) *io.PromiseImpl[User] { +func GetUserCompiled(name string) *io.PromiseImpl[User] { var state1 *io.PromiseImpl[*Response] var state2 *io.PromiseImpl[string] P := &io.PromiseImpl[User]{} + P.Debug = "GetUser" P.Func = func(resolve func(User, error)) { for { switch P.Prev = P.Next; P.Prev { case 0: - state1 = AsyncHttpGetCompiled("http://example.com/user/" + uid) P.Next = 1 + state1 = AsyncHttpGetCompiled("http://example.com/user/" + name) + state1.Exec = P.Exec + state1.Call() return case 1: - state1.EnsureDone() + if !state1.Done() { + state1.Call() + return + } + P.Next = 2 resp, err := state1.Value, state1.Err + log.Printf("resp: %v, err: %v\n", resp, err) if err != nil { resolve(User{}, err) return @@ -130,26 +172,121 @@ func GetUserCompiled(uid string) *io.PromiseImpl[User] { return } - resp.mock(`{"name":"Alice"}`) - state2 = resp.TextCompiled() - P.Next = 2 + state2.Exec = P.Exec + state2.Call() + log.Printf("TextCompiled state2: %v\n", state2) return case 2: - state2.EnsureDone() + if !state2.Done() { + state2.Call() + return + } + P.Next = -1 body, err := state2.Value, state2.Err if err != nil { resolve(User{}, err) return } user := User{} + log.Printf("body: %v\n", body) if err := json.Unmarshal([]byte(body), &user); err != nil { resolve(User{}, err) return } resolve(user, nil) + return + default: + panic(fmt.Sprintf("Promise already done, %+v", P)) + } + } + } + return P +} + +func GetScore() (resolve io.Promise[float64]) { + resp, err := AsyncHttpGet("http://example.com/score/").Await() + if err != nil { + resolve(0, err) + return + } + + if resp.StatusCode != 200 { + resolve(0, fmt.Errorf("http status code: %d", resp.StatusCode)) + return + } + + body, err := resp.Text().Await() + if err != nil { + resolve(0, err) + return + } + + score := 0.0 + if _, err := fmt.Sscanf(body, "%f", &score); err != nil { + resolve(0, err) + return + } + resolve(score, nil) + return +} + +func GetScoreCompiled() *io.PromiseImpl[float64] { + var state1 *io.PromiseImpl[*Response] + var state2 *io.PromiseImpl[string] + + P := &io.PromiseImpl[float64]{} + P.Debug = "GetScore" + P.Func = func(resolve func(float64, error)) { + for { + switch P.Prev = P.Next; P.Prev { + case 0: + P.Next = 1 + state1 = AsyncHttpGetCompiled("http://example.com/score/") + state1.Exec = P.Exec + state1.Call() + return + case 1: + if !state1.Done() { + state1.Call() + return + } + P.Next = 2 + resp, err := state1.Value, state1.Err + if err != nil { + resolve(0, err) + return + } + + if resp.StatusCode != 200 { + resolve(0, fmt.Errorf("http status code: %d", resp.StatusCode)) + return + } + + state2 = resp.TextCompiled() + state2.Exec = P.Exec + state2.Call() + + return + case 2: + if !state2.Done() { + state2.Call() + return + } P.Next = -1 + body, err := state2.Value, state2.Err + if err != nil { + resolve(0, err) + return + } + + score := 0.0 + if _, err := fmt.Sscanf(body, "%f", &score); err != nil { + resolve(0, err) + return + } + resolve(score, nil) return default: panic("Promise already done") @@ -159,36 +296,55 @@ func GetUserCompiled(uid string) *io.PromiseImpl[User] { return P } -func GetScore() *io.Promise[float64] { - panic("todo: GetScore") -} - -func GetScoreCompiled() *io.PromiseImpl[float64] { - P := &io.PromiseImpl[float64]{} - P.Func = func(resolve func(float64, error)) { - for { - switch P.Prev = P.Next; P.Prev { - case 0: - panic("todo: GetScore") - default: - panic("Promise already done") - } - } +func DoUpdate(op string) (resolve io.Promise[io.Void]) { + resp, err := AsyncHttpPost("http://example.com/update/" + op).Await() + if err != nil { + resolve(io.Void{}, err) + return } - return P -} -func DoUpdate(op string) *io.Promise[io.Void] { - panic("todo: DoUpdate") + if resp.StatusCode != 200 { + resolve(io.Void{}, fmt.Errorf("http status code: %d", resp.StatusCode)) + return + } + + resolve(io.Void{}, nil) + return } func DoUpdateCompiled(op string) *io.PromiseImpl[io.Void] { + var state1 *io.PromiseImpl[*Response] + P := &io.PromiseImpl[io.Void]{} + P.Debug = "DoUpdate" P.Func = func(resolve func(io.Void, error)) { for { switch P.Prev = P.Next; P.Prev { case 0: - panic("todo: DoUpdate") + P.Next = 1 + state1 = AsyncHttpPostCompiled("http://example.com/update/" + op) + state1.Exec = P.Exec + state1.Call() + return + case 1: + if !state1.Done() { + state1.Call() + return + } + P.Next = -1 + resp, err := state1.Value, state1.Err + if err != nil { + resolve(io.Void{}, err) + return + } + + if resp.StatusCode != 200 { + resolve(io.Void{}, fmt.Errorf("http status code: %d", resp.StatusCode)) + return + } + + resolve(io.Void{}, nil) + return default: panic("Promise already done") } @@ -198,82 +354,103 @@ func DoUpdateCompiled(op string) *io.PromiseImpl[io.Void] { } func Demo() (resolve io.Promise[io.Void]) { - user, err := GetUser("123").Await() + user, err := GetUser("1").Await() log.Println(user, err) - user, err = io.Race[User](GetUser("123"), GetUser("456"), GetUser("789")).Await() + user, err = io.Race[User](GetUser("2"), GetUser("3"), GetUser("4")).Await() log.Println(user, err) - users, err := io.All[User]([]io.AsyncCall[User]{GetUser("123"), GetUser("456"), GetUser("789")}).Await() + users, err := io.All[User]([]io.AsyncCall[User]{GetUser("5"), GetUser("6"), GetUser("7")}).Await() log.Println(users, err) - user, score, _, err := io.Await3[User, float64, io.Void](GetUser("123"), GetScore(), DoUpdate("update sth.")) + user, score, _, err := io.Await3[User, float64, io.Void](GetUser("8"), GetScore(), DoUpdate("update sth.")) log.Println(user, score, err) // TODO(lijie): select from multiple promises without channel - select { - case user := <-GetUser("123").Chan(): - log.Println("user:", user) - case score := <-GetScore().Chan(): - log.Println("score:", score) - case <-io.Timeout(5 * time.Second).Chan(): - log.Println("timeout") - } + // select { + // case user := <-GetUser("123").Chan(): + // log.Println("user:", user) + // case score := <-GetScore().Chan(): + // log.Println("score:", score) + // case <-io.Timeout(5 * time.Second).Chan(): + // log.Println("timeout") + // } return } func DemoCompiled() *io.PromiseImpl[io.Void] { var state1 *io.PromiseImpl[User] var state2 *io.PromiseImpl[User] - var state3 *io.PromiseImpl[[]User] + var state3 *io.PromiseImpl[[]io.Result[User]] var state4 *io.PromiseImpl[io.Await3Result[User, float64, io.Void]] P := &io.PromiseImpl[io.Void]{} + P.Debug = "Demo" P.Func = func(resolve func(io.Void, error)) { for { switch P.Prev = P.Next; P.Prev { case 0: - state1 = GetUserCompiled("123") P.Next = 1 + state1 = GetUserCompiled("1") + state1.Exec = P.Exec + state1.Call() return case 1: - state1.EnsureDone() + if !state1.Done() { + state1.Call() + return + } + P.Next = 2 user, err := state1.Value, state1.Err log.Printf("user: %v, err: %v\n", user, err) - state2 = io.Race[User](GetUserCompiled("123"), GetUserCompiled("456"), GetUserCompiled("789")) - P.Next = 2 + state2 = io.Race[User](GetUserCompiled("2"), GetUserCompiled("3"), GetUserCompiled("4")) + log.Printf("state2: %v\n", state2) + state2.Exec = P.Exec + state2.Call() return case 2: - state2.EnsureDone() - user, err := state2.Value, state2.Err - log.Println(user, err) - state3 = io.All[User]([]io.AsyncCall[User]{GetUserCompiled("123"), GetUserCompiled("456"), GetUserCompiled("789")}) + if !state2.Done() { + state2.Call() + return + } + log.Printf("case 2, state2: %+v\n", state2) + P.Next = 3 + user, err := state2.Value, state2.Err + log.Printf("race user: %v, err: %v\n", user, err) + + state3 = io.All[User]([]io.AsyncCall[User]{GetUserCompiled("5"), GetUserCompiled("6"), GetUserCompiled("7")}) + log.Printf("state3: %v\n", state3) + state3.Exec = P.Exec + state3.Call() return case 3: - state3.EnsureDone() + if !state3.Done() { + state3.Call() + return + } + + P.Next = 4 users, err := state3.Value, state3.Err log.Println(users, err) - state4 = io.Await3Compiled[User, float64, io.Void](GetUserCompiled("123"), GetScoreCompiled(), DoUpdateCompiled("update sth.")) - P.Next = 4 + state4 = io.Await3Compiled[User, float64, io.Void](GetUserCompiled("8"), GetScoreCompiled(), DoUpdateCompiled("update sth.")) + log.Printf("state4: %v\n", state4) + state4.Exec = P.Exec + state4.Call() return case 4: - state4.EnsureDone() + if !state4.Done() { + state4.Call() + return + } + + P.Next = -1 user, score, _, err := state4.Value.V1, state4.Value.V2, state4.Value.V3, state4.Value.Err log.Println(user, score, err) - - select { - case user := <-GetUserCompiled("123").Chan(): - log.Println("user:", user) - case score := <-GetScoreCompiled().Chan(): - log.Println("score:", score) - case <-io.TimeoutCompiled(5 * time.Second).Chan(): - log.Println("timeout") - } - P.Next = -1 + resolve(io.Void{}, nil) return default: panic("Promise already done") @@ -286,5 +463,6 @@ func DemoCompiled() *io.PromiseImpl[io.Void] { func main() { log.SetFlags(log.Lshortfile | log.LstdFlags) // io.Run(Demo()) - io.Run(DemoCompiled()) + v, err := io.Run[io.Void](DemoCompiled()) + log.Println(v, err) } diff --git a/x/io/extra.go b/x/io/extra.go new file mode 100644 index 00000000..afd0d72c --- /dev/null +++ b/x/io/extra.go @@ -0,0 +1,177 @@ +/* + * Copyright (c) 2024 The GoPlus Authors (goplus.org). All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io + +import ( + "log" + "sync" + "time" + _ "unsafe" +) + +// ----------------------------------------------------------------------------- + +// llgo:link AsyncCall.Await llgo.await +func Await[OutT any](call AsyncCall[OutT], timeout ...time.Duration) (ret OutT, err error) { + return +} + +//go:linkname Timeout llgo.timeout +func Timeout(time.Duration) (ret AsyncCall[Void]) + +func TimeoutCompiled(d time.Duration) *PromiseImpl[Void] { + P := &PromiseImpl[Void]{} + P.Debug = "Timeout" + P.Func = func(resolve func(Void, error)) { + go func() { + time.Sleep(d) + resolve(Void{}, nil) + }() + } + return P +} + +type Result[T any] struct { + V T + Err error +} + +// llgo:link Race llgo.race +func Race[OutT any](acs ...AsyncCall[OutT]) *PromiseImpl[OutT] { + P := &PromiseImpl[OutT]{} + P.Debug = "Race" + P.Func = func(resolve func(OutT, error)) { + P.Next = -1 + rc := make(chan Result[OutT], len(acs)) + for _, ac := range acs { + ac := ac + go func(ac AsyncCall[OutT]) { + v, err := Run(ac) + rc <- Result[OutT]{v, err} + }(ac) + } + + v := <-rc + if debugAsync { + log.Printf("io.Race done: %+v won the race\n", v) + } + resolve(v.V, v.Err) + go func() { + count := 1 + for count < len(acs) { + <-rc + count++ + } + close(rc) + }() + } + return P +} + +func All[OutT any](acs []AsyncCall[OutT]) *PromiseImpl[[]Result[OutT]] { + P := &PromiseImpl[[]Result[OutT]]{} + P.Debug = "All" + P.Func = func(resolve func([]Result[OutT], error)) { + P.Next = -1 + wg := sync.WaitGroup{} + ret := make([]Result[OutT], len(acs)) + for idx, ac := range acs { + idx := idx + ac := ac + wg.Add(1) + go func(ac AsyncCall[OutT]) { + v, err := Run(ac) + ret[idx] = Result[OutT]{v, err} + wg.Done() + }(ac) + } + + wg.Wait() + if debugAsync { + log.Printf("io.All done: %+v\n", ret) + } + resolve(ret, nil) + } + return P +} + +// llgo:link Await2 llgo.await +func Await2[OutT1, OutT2 any]( + ac1 AsyncCall[OutT1], ac2 AsyncCall[OutT2], + timeout ...time.Duration) (ret1 OutT1, ret2 OutT2, err error) { + return +} + +type Await2Result[T1 any, T2 any] struct { + V1 T1 + V2 T2 + Err error +} + +func Await2Compiled[OutT1, OutT2 any]( + ac1 AsyncCall[OutT1], ac2 AsyncCall[OutT2], + timeout ...time.Duration) (ret *PromiseImpl[Await2Result[OutT1, OutT2]]) { + return +} + +// llgo:link Await3 llgo.await +func Await3[OutT1, OutT2, OutT3 any]( + ac1 AsyncCall[OutT1], ac2 AsyncCall[OutT2], ac3 AsyncCall[OutT3], + timeout ...time.Duration) (ret1 OutT1, ret2 OutT2, ret3 OutT3, err error) { + return +} + +type Await3Result[T1 any, T2 any, T3 any] struct { + V1 T1 + V2 T2 + V3 T3 + Err error +} + +// TODO(lijie): rewrite to unblock and avoid goroutine +func Await3Compiled[OutT1, OutT2, OutT3 any]( + ac1 AsyncCall[OutT1], ac2 AsyncCall[OutT2], ac3 AsyncCall[OutT3], + timeout ...time.Duration) *PromiseImpl[Await3Result[OutT1, OutT2, OutT3]] { + P := &PromiseImpl[Await3Result[OutT1, OutT2, OutT3]]{} + P.Debug = "Await3" + P.Func = func(resolve func(Await3Result[OutT1, OutT2, OutT3], error)) { + P.Next = -1 + + ret := Await3Result[OutT1, OutT2, OutT3]{} + wg := sync.WaitGroup{} + wg.Add(3) + + go func() { + defer wg.Done() + ret.V1, ret.Err = Run(ac1) + }() + go func() { + defer wg.Done() + ret.V2, ret.Err = Run(ac2) + }() + go func() { + defer wg.Done() + ret.V3, ret.Err = Run(ac3) + }() + wg.Wait() + if debugAsync { + log.Printf("Await3 done: %+v\n", ret) + } + resolve(ret, nil) + } + return P +} diff --git a/x/io/io.go b/x/io/io.go index f4b4c18b..f116ad93 100644 --- a/x/io/io.go +++ b/x/io/io.go @@ -17,6 +17,8 @@ package io import ( + "log" + "sync" _ "unsafe" "time" @@ -26,87 +28,62 @@ const ( LLGoPackage = "decl" ) +var debugAsync = false + type Void = [0]byte // ----------------------------------------------------------------------------- +type asyncCall interface { + Resume() + Call() + Done() bool +} + type AsyncCall[OutT any] interface { + Call() Await(timeout ...time.Duration) (ret OutT, err error) Chan() <-chan OutT - EnsureDone() + Done() bool } -// llgo:link AsyncCall.Await llgo.await -func Await[OutT any](call AsyncCall[OutT], timeout ...time.Duration) (ret OutT, err error) { - return +type executor struct { + ac asyncCall + mu sync.Mutex + cond *sync.Cond + susp bool } -//go:linkname Timeout llgo.timeout -func Timeout(time.Duration) (ret AsyncCall[Void]) +func newExecutor() *executor { + e := &executor{} + e.cond = sync.NewCond(&e.mu) + return e +} -func TimeoutCompiled(d time.Duration) *PromiseImpl[Void] { - P := &PromiseImpl[Void]{} - P.Func = func(resolve func(Void, error)) { - go func() { - time.Sleep(d) - resolve(Void{}, nil) - }() +func (e *executor) Resume() { + e.mu.Lock() + defer e.mu.Unlock() + e.susp = false + e.cond.Signal() +} + +func Run[OutT any](ac AsyncCall[OutT]) (OutT, error) { + e := newExecutor() + p := ac.(*PromiseImpl[OutT]) + p.Exec = e + + for { + e.mu.Lock() + for e.susp { + e.cond.Wait() + } + e.mu.Unlock() + e.susp = true + if ac.Done() { + return p.Value, p.Err + } + ac.Call() } - return P -} - -// llgo:link Race llgo.race -func Race[OutT any](acs ...AsyncCall[OutT]) (ret *PromiseImpl[OutT]) { - return -} - -func All[OutT any](acs []AsyncCall[OutT]) (ret *PromiseImpl[[]OutT]) { - return nil -} - -// llgo:link Await2 llgo.await -func Await2[OutT1, OutT2 any]( - ac1 AsyncCall[OutT1], ac2 AsyncCall[OutT2], - timeout ...time.Duration) (ret1 OutT1, ret2 OutT2, err error) { - return -} - -type Await2Result[T1 any, T2 any] struct { - V1 T1 - V2 T2 - Err error -} - -func Await2Compiled[OutT1, OutT2 any]( - ac1 AsyncCall[OutT1], ac2 AsyncCall[OutT2], - timeout ...time.Duration) (ret *PromiseImpl[Await2Result[OutT1, OutT2]]) { - return -} - -// llgo:link Await3 llgo.await -func Await3[OutT1, OutT2, OutT3 any]( - ac1 AsyncCall[OutT1], ac2 AsyncCall[OutT2], ac3 AsyncCall[OutT3], - timeout ...time.Duration) (ret1 OutT1, ret2 OutT2, ret3 OutT3, err error) { - return -} - -type Await3Result[T1 any, T2 any, T3 any] struct { - V1 T1 - V2 T2 - V3 T3 - Err error -} - -func Await3Compiled[OutT1, OutT2, OutT3 any]( - ac1 AsyncCall[OutT1], ac2 AsyncCall[OutT2], ac3 AsyncCall[OutT3], - timeout ...time.Duration) (ret *PromiseImpl[Await3Result[OutT1, OutT2, OutT3]]) { - return -} - -func Run(ac AsyncCall[Void]) { - p := ac.(*PromiseImpl[Void]) - p.Resume() - <-ac.Chan() } // ----------------------------------------------------------------------------- @@ -118,37 +95,49 @@ func (p Promise[OutT]) Await(timeout ...time.Duration) (ret OutT, err error) { return } +func (p Promise[OutT]) Call() { + +} + func (p Promise[OutT]) Chan() <-chan OutT { return nil } -func (p Promise[OutT]) EnsureDone() { - +func (p Promise[OutT]) Done() bool { + return false } // ----------------------------------------------------------------------------- type PromiseImpl[TOut any] struct { - Func func(resolve func(TOut, error)) - Value TOut - Err error Prev int Next int + Exec *executor + Debug string - c chan TOut + Func func(resolve func(TOut, error)) + Err error + Value TOut + c chan TOut } func (p *PromiseImpl[TOut]) Resume() { - p.Func(func(v TOut, err error) { - p.Value = v - p.Err = err - }) + p.Exec.Resume() } -func (p *PromiseImpl[TOut]) EnsureDone() { - if p.Next == -1 { - panic("Promise already done") - } +func (p *PromiseImpl[TOut]) Done() bool { + return p.Next == -1 +} + +func (p *PromiseImpl[TOut]) Call() { + p.Func(func(v TOut, err error) { + if debugAsync { + log.Printf("Resolve task: %+v, %+v, %+v\n", p, v, err) + } + p.Value = v + p.Err = err + p.Resume() + }) } func (p *PromiseImpl[TOut]) Chan() <-chan TOut {