diff --git a/x/io/_demo/asyncdemo/async.go b/x/io/_demo/asyncdemo/async.go index 171301a9..02b4475a 100644 --- a/x/io/_demo/asyncdemo/async.go +++ b/x/io/_demo/asyncdemo/async.go @@ -3,11 +3,14 @@ package main import ( "encoding/json" "fmt" + "log" "time" "github.com/goplus/llgo/x/io" ) +// ----------------------------------------------------------------------------- + type Response struct { StatusCode int @@ -23,8 +26,26 @@ func (r *Response) Text() (resolve io.Promise[string]) { return } +func (r *Response) TextCompiled() *io.PromiseImpl[string] { + P := &io.PromiseImpl[string]{} + P.Func = func(resolve func(string, error)) { + for { + switch P.Prev = P.Next; P.Prev { + case 0: + resolve(r.mockBody, nil) + P.Next = -1 + return + default: + panic("Promise already done") + } + } + } + return P +} + func HttpGet(url string, callback func(resp *Response, err error)) { - panic("todo: Get") + resp := &Response{StatusCode: 200} + callback(resp, nil) } func AsyncHttpGet(url string) (resolve io.Promise[*Response]) { @@ -32,6 +53,25 @@ func AsyncHttpGet(url string) (resolve io.Promise[*Response]) { return } +func AsyncHttpGetCompiled(url string) *io.PromiseImpl[*Response] { + P := &io.PromiseImpl[*Response]{} + P.Func = func(resolve func(*Response, error)) { + for { + switch P.Prev = P.Next; P.Prev { + case 0: + HttpGet(url, resolve) + P.Next = -1 + return + default: + panic("Promise already done") + } + } + } + return P +} + +// ----------------------------------------------------------------------------- + type User struct { Name string } @@ -65,33 +105,186 @@ func GetUser(uid string) (resolve io.Promise[User]) { return } +func GetUserCompiled(uid string) *io.PromiseImpl[User] { + var state1 *io.PromiseImpl[*Response] + var state2 *io.PromiseImpl[string] + + P := &io.PromiseImpl[User]{} + P.Func = func(resolve func(User, error)) { + for { + switch P.Prev = P.Next; P.Prev { + case 0: + state1 = AsyncHttpGetCompiled("http://example.com/user/" + uid) + P.Next = 1 + return + case 1: + state1.EnsureDone() + resp, err := state1.Value, state1.Err + if err != nil { + resolve(User{}, err) + return + } + + if resp.StatusCode != 200 { + resolve(User{}, fmt.Errorf("http status code: %d", resp.StatusCode)) + return + } + + resp.mock(`{"name":"Alice"}`) + + state2 = resp.TextCompiled() + P.Next = 2 + return + case 2: + state2.EnsureDone() + body, err := state2.Value, state2.Err + if err != nil { + resolve(User{}, err) + return + } + user := User{} + if err := json.Unmarshal([]byte(body), &user); err != nil { + resolve(User{}, err) + return + } + + resolve(user, nil) + P.Next = -1 + return + default: + panic("Promise already done") + } + } + } + return P +} + func GetScore() *io.Promise[float64] { panic("todo: GetScore") } +func GetScoreCompiled() *io.PromiseImpl[float64] { + P := &io.PromiseImpl[float64]{} + P.Func = func(resolve func(float64, error)) { + for { + switch P.Prev = P.Next; P.Prev { + case 0: + panic("todo: GetScore") + default: + panic("Promise already done") + } + } + } + return P +} + func DoUpdate(op string) *io.Promise[io.Void] { panic("todo: DoUpdate") } -func main() { +func DoUpdateCompiled(op string) *io.PromiseImpl[io.Void] { + P := &io.PromiseImpl[io.Void]{} + P.Func = func(resolve func(io.Void, error)) { + for { + switch P.Prev = P.Next; P.Prev { + case 0: + panic("todo: DoUpdate") + default: + panic("Promise already done") + } + } + } + return P +} + +func Demo() (resolve io.Promise[io.Void]) { user, err := GetUser("123").Await() - fmt.Println(user, err) + log.Println(user, err) user, err = io.Race[User](GetUser("123"), GetUser("456"), GetUser("789")).Await() - fmt.Println(user, err) + log.Println(user, err) users, err := io.All[User]([]io.AsyncCall[User]{GetUser("123"), GetUser("456"), GetUser("789")}).Await() - fmt.Println(users, err) + log.Println(users, err) user, score, _, err := io.Await3[User, float64, io.Void](GetUser("123"), GetScore(), DoUpdate("update sth.")) - fmt.Println(user, score, err) + log.Println(user, score, err) + // TODO(lijie): select from multiple promises without channel select { case user := <-GetUser("123").Chan(): - fmt.Println("user:", user) + log.Println("user:", user) case score := <-GetScore().Chan(): - fmt.Println("score:", score) + log.Println("score:", score) case <-io.Timeout(5 * time.Second).Chan(): - fmt.Println("timeout") + log.Println("timeout") } + return +} + +func DemoCompiled() *io.PromiseImpl[io.Void] { + var state1 *io.PromiseImpl[User] + var state2 *io.PromiseImpl[User] + var state3 *io.PromiseImpl[[]User] + var state4 *io.PromiseImpl[io.Await3Result[User, float64, io.Void]] + + P := &io.PromiseImpl[io.Void]{} + P.Func = func(resolve func(io.Void, error)) { + for { + switch P.Prev = P.Next; P.Prev { + case 0: + state1 = GetUserCompiled("123") + P.Next = 1 + return + case 1: + state1.EnsureDone() + user, err := state1.Value, state1.Err + log.Printf("user: %v, err: %v\n", user, err) + + state2 = io.Race[User](GetUserCompiled("123"), GetUserCompiled("456"), GetUserCompiled("789")) + P.Next = 2 + return + case 2: + state2.EnsureDone() + user, err := state2.Value, state2.Err + log.Println(user, err) + + state3 = io.All[User]([]io.AsyncCall[User]{GetUserCompiled("123"), GetUserCompiled("456"), GetUserCompiled("789")}) + P.Next = 3 + return + case 3: + state3.EnsureDone() + users, err := state3.Value, state3.Err + log.Println(users, err) + + state4 = io.Await3Compiled[User, float64, io.Void](GetUserCompiled("123"), GetScoreCompiled(), DoUpdateCompiled("update sth.")) + P.Next = 4 + return + case 4: + state4.EnsureDone() + user, score, _, err := state4.Value.V1, state4.Value.V2, state4.Value.V3, state4.Value.Err + log.Println(user, score, err) + + select { + case user := <-GetUserCompiled("123").Chan(): + log.Println("user:", user) + case score := <-GetScoreCompiled().Chan(): + log.Println("score:", score) + case <-io.TimeoutCompiled(5 * time.Second).Chan(): + log.Println("timeout") + } + P.Next = -1 + return + default: + panic("Promise already done") + } + } + } + return P +} + +func main() { + log.SetFlags(log.Lshortfile | log.LstdFlags) + // io.Run(Demo()) + io.Run(DemoCompiled()) } diff --git a/x/io/io.go b/x/io/io.go index fde7428f..f4b4c18b 100644 --- a/x/io/io.go +++ b/x/io/io.go @@ -33,6 +33,7 @@ type Void = [0]byte type AsyncCall[OutT any] interface { Await(timeout ...time.Duration) (ret OutT, err error) Chan() <-chan OutT + EnsureDone() } // llgo:link AsyncCall.Await llgo.await @@ -43,12 +44,23 @@ func Await[OutT any](call AsyncCall[OutT], timeout ...time.Duration) (ret OutT, //go:linkname Timeout llgo.timeout func Timeout(time.Duration) (ret AsyncCall[Void]) +func TimeoutCompiled(d time.Duration) *PromiseImpl[Void] { + P := &PromiseImpl[Void]{} + P.Func = func(resolve func(Void, error)) { + go func() { + time.Sleep(d) + resolve(Void{}, nil) + }() + } + return P +} + // llgo:link Race llgo.race -func Race[OutT any](acs ...AsyncCall[OutT]) (ret AsyncCall[OutT]) { +func Race[OutT any](acs ...AsyncCall[OutT]) (ret *PromiseImpl[OutT]) { return } -func All[OutT any](acs []AsyncCall[OutT]) (ret AsyncCall[[]OutT]) { +func All[OutT any](acs []AsyncCall[OutT]) (ret *PromiseImpl[[]OutT]) { return nil } @@ -59,6 +71,18 @@ func Await2[OutT1, OutT2 any]( return } +type Await2Result[T1 any, T2 any] struct { + V1 T1 + V2 T2 + Err error +} + +func Await2Compiled[OutT1, OutT2 any]( + ac1 AsyncCall[OutT1], ac2 AsyncCall[OutT2], + timeout ...time.Duration) (ret *PromiseImpl[Await2Result[OutT1, OutT2]]) { + return +} + // llgo:link Await3 llgo.await func Await3[OutT1, OutT2, OutT3 any]( ac1 AsyncCall[OutT1], ac2 AsyncCall[OutT2], ac3 AsyncCall[OutT3], @@ -66,6 +90,25 @@ func Await3[OutT1, OutT2, OutT3 any]( return } +type Await3Result[T1 any, T2 any, T3 any] struct { + V1 T1 + V2 T2 + V3 T3 + Err error +} + +func Await3Compiled[OutT1, OutT2, OutT3 any]( + ac1 AsyncCall[OutT1], ac2 AsyncCall[OutT2], ac3 AsyncCall[OutT3], + timeout ...time.Duration) (ret *PromiseImpl[Await3Result[OutT1, OutT2, OutT3]]) { + return +} + +func Run(ac AsyncCall[Void]) { + p := ac.(*PromiseImpl[Void]) + p.Resume() + <-ac.Chan() +} + // ----------------------------------------------------------------------------- type Promise[OutT any] func(OutT, error) @@ -79,4 +122,49 @@ func (p Promise[OutT]) Chan() <-chan OutT { return nil } +func (p Promise[OutT]) EnsureDone() { + +} + +// ----------------------------------------------------------------------------- + +type PromiseImpl[TOut any] struct { + Func func(resolve func(TOut, error)) + Value TOut + Err error + Prev int + Next int + + c chan TOut +} + +func (p *PromiseImpl[TOut]) Resume() { + p.Func(func(v TOut, err error) { + p.Value = v + p.Err = err + }) +} + +func (p *PromiseImpl[TOut]) EnsureDone() { + if p.Next == -1 { + panic("Promise already done") + } +} + +func (p *PromiseImpl[TOut]) Chan() <-chan TOut { + if p.c == nil { + p.c = make(chan TOut, 1) + p.Func(func(v TOut, err error) { + p.Value = v + p.Err = err + p.c <- v + }) + } + return p.c +} + +func (p *PromiseImpl[TOut]) Await(timeout ...time.Duration) (ret TOut, err error) { + panic("should not called") +} + // -----------------------------------------------------------------------------