diff --git a/x/io/_demo/asyncdemo/async.go b/x/io/_demo/asyncdemo/async.go index ff75e943..17138fc3 100644 --- a/x/io/_demo/asyncdemo/async.go +++ b/x/io/_demo/asyncdemo/async.go @@ -27,15 +27,13 @@ func (r *Response) TextCompiled() *io.PromiseImpl[string] { P := &io.PromiseImpl[string]{} P.Debug = "Text" P.Func = func(resolve func(string, error)) { - for { - switch P.Next { - case 0: - P.Next = -1 - resolve(r.Body, nil) - return - default: - panic("Promise already done") - } + switch P.Next { + case 0: + P.Next = -1 + resolve(r.Body, nil) + return + default: + panic("Promise already done") } } return P @@ -65,15 +63,13 @@ func AsyncHttpGetCompiled(url string) *io.PromiseImpl[*Response] { P := &io.PromiseImpl[*Response]{} P.Debug = "HttpGet" P.Func = func(resolve func(*Response, error)) { - for { - switch P.Next { - case 0: - P.Next = -1 - Http("GET", url, resolve) - return - default: - panic("Promise already done") - } + switch P.Next { + case 0: + P.Next = -1 + Http("GET", url, resolve) + return + default: + panic("Promise already done") } } return P @@ -88,15 +84,13 @@ func AsyncHttpPostCompiled(url string) *io.PromiseImpl[*Response] { P := &io.PromiseImpl[*Response]{} P.Debug = "HttpPost" P.Func = func(resolve func(*Response, error)) { - for { - switch P.Next { - case 0: - P.Next = -1 - Http("POST", url, resolve) - return - default: - panic("Promise already done") - } + switch P.Next { + case 0: + P.Next = -1 + Http("POST", url, resolve) + return + default: + panic("Promise already done") } } return P @@ -142,55 +136,52 @@ func GetUserCompiled(name string) *io.PromiseImpl[User] { P := &io.PromiseImpl[User]{} P.Debug = "GetUser" P.Func = func(resolve func(User, error)) { - for { - switch P.Next { - case 0: - P.Next = 1 - state1 = AsyncHttpGetCompiled("http://example.com/user/" + name) - state1.Exec = P.Exec - state1.Parent = P - state1.Call() + switch P.Next { + case 0: + P.Next = 1 + state1 = AsyncHttpGetCompiled("http://example.com/user/" + name) + state1.Exec = P.Exec + state1.Parent = P + state1.Call() + return + case 1: + P.Next = 2 + resp, err := state1.Value(), state1.Err() + log.Printf("resp: %v, err: %v\n", resp, err) + if err != nil { + resolve(User{}, err) return - case 1: - P.Next = 2 - resp, err := state1.Value, state1.Err - log.Printf("resp: %v, err: %v\n", resp, err) - if err != nil { - resolve(User{}, err) - return - } - - if resp.StatusCode != 200 { - resolve(User{}, fmt.Errorf("http status code: %d", resp.StatusCode)) - return - } - - state2 = resp.TextCompiled() - state2.Exec = P.Exec - state2.Parent = P - state2.Call() - log.Printf("TextCompiled state2: %+v\n", state2) - return - case 2: - P.Next = -1 - log.Printf("TextCompiled state2: %+v\n", state2) - body, err := state2.Value, state2.Err - if err != nil { - resolve(User{}, err) - return - } - user := User{} - log.Printf("body: %v\n", body) - if err := json.Unmarshal([]byte(body), &user); err != nil { - resolve(User{}, err) - return - } - - resolve(user, nil) - return - default: - panic(fmt.Sprintf("Promise already done, %+v", P)) } + + if resp.StatusCode != 200 { + resolve(User{}, fmt.Errorf("http status code: %d", resp.StatusCode)) + return + } + + state2 = resp.TextCompiled() + state2.Exec = P.Exec + state2.Parent = P + state2.Call() + return + case 2: + P.Next = -1 + body, err := state2.Value(), state2.Err() + if err != nil { + resolve(User{}, err) + return + } + user := User{} + log.Printf("body: %v\n", body) + if err := json.Unmarshal([]byte(body), &user); err != nil { + resolve(User{}, err) + return + } + + log.Printf("resolve user: %+v\n", user) + resolve(user, nil) + return + default: + panic(fmt.Sprintf("Promise already done, %+v", P)) } } return P @@ -230,52 +221,50 @@ func GetScoreCompiled() *io.PromiseImpl[float64] { P := &io.PromiseImpl[float64]{} P.Debug = "GetScore" P.Func = func(resolve func(float64, error)) { - for { - switch P.Next { - case 0: - P.Next = 1 - state1 = AsyncHttpGetCompiled("http://example.com/score/") - state1.Exec = P.Exec - state1.Parent = P - state1.Call() + switch P.Next { + case 0: + P.Next = 1 + state1 = AsyncHttpGetCompiled("http://example.com/score/") + state1.Exec = P.Exec + state1.Parent = P + state1.Call() + return + case 1: + P.Next = 2 + resp, err := state1.Value(), state1.Err() + if err != nil { + resolve(0, err) return - case 1: - P.Next = 2 - resp, err := state1.Value, state1.Err - if err != nil { - resolve(0, err) - return - } - - if resp.StatusCode != 200 { - resolve(0, fmt.Errorf("http status code: %d", resp.StatusCode)) - return - } - - state2 = resp.TextCompiled() - state2.Exec = P.Exec - state2.Parent = P - state2.Call() - - return - case 2: - P.Next = -1 - body, err := state2.Value, state2.Err - if err != nil { - resolve(0, err) - return - } - - score := 0.0 - if _, err := fmt.Sscanf(body, "%f", &score); err != nil { - resolve(0, err) - return - } - resolve(score, nil) - return - default: - panic("Promise already done") } + + if resp.StatusCode != 200 { + resolve(0, fmt.Errorf("http status code: %d", resp.StatusCode)) + return + } + + state2 = resp.TextCompiled() + state2.Exec = P.Exec + state2.Parent = P + state2.Call() + + return + case 2: + P.Next = -1 + body, err := state2.Value(), state2.Err() + if err != nil { + resolve(0, err) + return + } + + score := 0.0 + if _, err := fmt.Sscanf(body, "%f", &score); err != nil { + resolve(0, err) + return + } + resolve(score, nil) + return + default: + panic("Promise already done") } } return P @@ -303,33 +292,131 @@ func DoUpdateCompiled(op string) *io.PromiseImpl[io.Void] { P := &io.PromiseImpl[io.Void]{} P.Debug = "DoUpdate" P.Func = func(resolve func(io.Void, error)) { - for { - switch P.Next { - case 0: - P.Next = 1 - state1 = AsyncHttpPostCompiled("http://example.com/update/" + op) - state1.Exec = P.Exec - state1.Parent = P - state1.Call() + switch P.Next { + case 0: + P.Next = 1 + state1 = AsyncHttpPostCompiled("http://example.com/update/" + op) + state1.Exec = P.Exec + state1.Parent = P + state1.Call() + return + case 1: + P.Next = -1 + resp, err := state1.Value(), state1.Err() + if err != nil { + resolve(io.Void{}, err) return - case 1: - P.Next = -1 - resp, err := state1.Value, state1.Err - if err != nil { - resolve(io.Void{}, err) - return - } - - if resp.StatusCode != 200 { - resolve(io.Void{}, fmt.Errorf("http status code: %d", resp.StatusCode)) - return - } - - resolve(io.Void{}, nil) - return - default: - panic("Promise already done") } + + if resp.StatusCode != 200 { + resolve(io.Void{}, fmt.Errorf("http status code: %d", resp.StatusCode)) + return + } + + resolve(io.Void{}, nil) + return + default: + panic("Promise already done") + } + } + return P +} + +func GenInts() (yield io.Promise[int]) { + yield(3, nil) + yield(2, nil) + yield(5, nil) + return +} + +func GenIntsCompiled() *io.PromiseImpl[int] { + P := &io.PromiseImpl[int]{} + P.Debug = "GenInts" + P.Func = func(resolve func(int, error)) { + switch P.Next { + case 0: + P.Next = 1 + resolve(3, nil) + return + case 1: + P.Next = 2 + resolve(2, nil) + return + case 2: + P.Next = 3 + resolve(5, nil) + return + case 3: + P.Next = -1 + resolve(0, fmt.Errorf("stop")) + return + default: + panic("Generator already done") + } + } + return P +} + +func GenUsers() (yield io.Promise[User]) { + u, _ := GetUser("Alice").Await() + yield(u, nil) + u, _ = GetUser("Bob").Await() + yield(u, nil) + u, _ = GetUser("Cindy").Await() + yield(u, nil) + log.Printf("genUsers done\n") + return +} + +func GenUsersCompiled() (resolve *io.PromiseImpl[User]) { + var state1, state2, state3 *io.PromiseImpl[User] + + P := &io.PromiseImpl[User]{} + P.Debug = "GenUsers" + P.Func = func(resolve func(User, error)) { + switch P.Next { + case 0: + P.Next = 1 + state1 = GetUserCompiled("Alice") + state1.Exec = P.Exec + state1.Parent = P + state1.Call() + return + case 1: + P.Next = 2 + u, _ := state1.Value(), state1.Err() + resolve(u, nil) + return + case 2: + P.Next = 3 + state2 = GetUserCompiled("Bob") + state2.Exec = P.Exec + state2.Parent = P + state2.Call() + return + case 3: + P.Next = 4 + u, _ := state2.Value(), state2.Err() + resolve(u, nil) + return + case 4: + P.Next = 5 + state3 = GetUserCompiled("Cindy") + state3.Exec = P.Exec + state3.Parent = P + state3.Call() + return + case 5: + P.Next = 6 + u, _ := state3.Value(), state3.Err() + resolve(u, nil) + return + case 6: + P.Next = -1 + resolve(User{}, fmt.Errorf("stop")) + return + default: + panic("Generator already done") } } return P @@ -348,6 +435,27 @@ func Demo() (resolve io.Promise[io.Void]) { user, score, _, err := io.Await3[User, float64, io.Void](GetUser("8"), GetScore(), DoUpdate("update sth.")) log.Println(user, score, err) + // for loop with generator + g := GenInts() + for { + g.Call() + if g.Done() { + break + } + log.Println("genInt:", g.Value(), g.Done()) + } + + // for loop with async generator + g1 := GenUsers() + for { + g.Call() + u, err := io.Await[int](g) + if g1.Done() { + break + } + log.Println("genUser:", u, err) + } + // TODO(lijie): select from multiple promises without channel // select { // case user := <-GetUser("123").Chan(): @@ -365,65 +473,81 @@ func DemoCompiled() *io.PromiseImpl[io.Void] { var state2 *io.PromiseImpl[User] var state3 *io.PromiseImpl[[]io.Result[User]] var state4 *io.PromiseImpl[io.Await3Result[User, float64, io.Void]] + var g1 *io.PromiseImpl[int] + var g2 *io.PromiseImpl[User] P := &io.PromiseImpl[io.Void]{} P.Debug = "Demo" P.Func = func(resolve func(io.Void, error)) { - for { - switch P.Next { - case 0: - P.Next = 1 - state1 = GetUserCompiled("1") - state1.Exec = P.Exec - state1.Parent = P - state1.Call() - return - case 1: - P.Next = 2 - user, err := state1.Value, state1.Err - log.Printf("user: %+v, err: %v\n", user, err) + switch P.Next { + case 0: + P.Next = 1 + state1 = GetUserCompiled("1") + state1.Exec = P.Exec + state1.Parent = P + state1.Call() + return + case 1: + P.Next = 2 + user, err := state1.Value(), state1.Err() + log.Printf("user: %+v, err: %v\n", user, err) - state2 = io.Race[User](GetUserCompiled("2"), GetUserCompiled("3"), GetUserCompiled("4")) - log.Printf("state2: %+v\n", state2) - state2.Exec = P.Exec - state2.Parent = P - state2.Call() - return - case 2: - log.Printf("case 2, state2: %+v\n", state2) + state2 = io.Race[User](GetUserCompiled("2"), GetUserCompiled("3"), GetUserCompiled("4")) + state2.Exec = P.Exec + state2.Parent = P + state2.Call() + return + case 2: + P.Next = 3 + user, err := state2.Value(), state2.Err() + log.Printf("race user: %+v, err: %v\n", user, err) - P.Next = 3 - user, err := state2.Value, state2.Err - log.Printf("race user: %+v, err: %v\n", user, err) + state3 = io.All[User]([]io.AsyncCall[User]{GetUserCompiled("5"), GetUserCompiled("6"), GetUserCompiled("7")}) + state3.Exec = P.Exec + state3.Parent = P + state3.Call() + return + case 3: - state3 = io.All[User]([]io.AsyncCall[User]{GetUserCompiled("5"), GetUserCompiled("6"), GetUserCompiled("7")}) - log.Printf("state3: %+v\n", state3) - state3.Exec = P.Exec - state3.Parent = P - state3.Call() - return - case 3: + P.Next = 4 + users, err := state3.Value(), state3.Err() + log.Println(users, err) - P.Next = 4 - users, err := state3.Value, state3.Err - log.Println(users, err) + state4 = io.Await3Compiled[User, float64, io.Void](GetUserCompiled("8"), GetScoreCompiled(), DoUpdateCompiled("update sth.")) + state4.Exec = P.Exec + state4.Parent = P + state4.Call() + return + case 4: + P.Next = 5 + user, score, _, err := state4.Value().V1, state4.Value().V2, state4.Value().V3, state4.Value().Err + log.Println(user, score, err) - state4 = io.Await3Compiled[User, float64, io.Void](GetUserCompiled("8"), GetScoreCompiled(), DoUpdateCompiled("update sth.")) - log.Printf("state4: %+v\n", state4) - state4.Exec = P.Exec - state4.Parent = P - state4.Call() - return - case 4: + g1 = GenIntsCompiled() + for { + g1.Call() + if g1.Done() { + break + } + log.Printf("genInt: %+v, done: %v\n", g1.Value(), g1.Done()) + } + + g2 = GenUsersCompiled() + g2.Exec = P.Exec + g2.Parent = P + g2.Call() + return + case 5: + if g2.Err() != nil { P.Next = -1 - user, score, _, err := state4.Value.V1, state4.Value.V2, state4.Value.V3, state4.Value.Err - log.Println(user, score, err) resolve(io.Void{}, nil) return - default: - panic("Promise already done") } + log.Printf("genUser: %+v, done: %v\n", g2.Value(), g2.Done()) + g2.Call() + default: + panic("Promise already done") } } return P diff --git a/x/io/extra.go b/x/io/extra.go index 163b20e6..7ea8df75 100644 --- a/x/io/extra.go +++ b/x/io/extra.go @@ -88,7 +88,7 @@ func Race[OutT any](acs ...AsyncCall[OutT]) *PromiseImpl[OutT] { log.Printf("io.Race done: %+v won the race\n", p) } returned = true - resolve(p.Value, p.Err) + resolve(p.value, p.err) return } } @@ -134,7 +134,7 @@ func All[OutT any](acs []AsyncCall[OutT]) *PromiseImpl[[]Result[OutT]] { ret := make([]Result[OutT], len(acs)) for idx, p := range ps { - ret[idx] = Result[OutT]{p.Value, p.Err} + ret[idx] = Result[OutT]{p.value, p.err} } if debugAsync { log.Printf("io.All done: %+v\n", ret) @@ -164,7 +164,50 @@ type Await2Result[T1 any, T2 any] struct { func Await2Compiled[OutT1, OutT2 any]( ac1 AsyncCall[OutT1], ac2 AsyncCall[OutT2], timeout ...time.Duration) (ret *PromiseImpl[Await2Result[OutT1, OutT2]]) { - return + p1 := ac1.(*PromiseImpl[OutT1]) + p2 := ac2.(*PromiseImpl[OutT2]) + remaining := 2 + P := &PromiseImpl[Await2Result[OutT1, OutT2]]{} + P.Debug = "Await2" + P.Func = func(resolve func(Await2Result[OutT1, OutT2], error)) { + switch P.Next { + case 0: + P.Next = 1 + p1.Exec = P.Exec + p1.Parent = P + p1.Call() + + p2.Exec = P.Exec + p2.Parent = P + p2.Call() + return + case 1: + remaining-- + if remaining > 0 { + return + } + P.Next = -1 + if !p1.Done() || !p2.Done() { + log.Fatalf("io.Await2: not done: %+v, %+v\n", p1, p2) + } + + var err error + if p1.err != nil { + err = p1.err + } else if p2.err != nil { + err = p2.err + } + + resolve(Await2Result[OutT1, OutT2]{ + V1: p1.value, V2: p2.value, + Err: err, + }, err) + return + default: + panic("unreachable") + } + } + return P } // llgo:link Await3 llgo.await @@ -218,16 +261,16 @@ func Await3Compiled[OutT1, OutT2, OutT3 any]( } var err error - if p1.Err != nil { - err = p1.Err - } else if p2.Err != nil { - err = p2.Err - } else if p3.Err != nil { - err = p3.Err + if p1.err != nil { + err = p1.err + } else if p2.err != nil { + err = p2.err + } else if p3.err != nil { + err = p3.err } resolve(Await3Result[OutT1, OutT2, OutT3]{ - V1: p1.Value, V2: p2.Value, V3: p3.Value, + V1: p1.value, V2: p2.value, V3: p3.value, Err: err, }, err) return @@ -273,7 +316,7 @@ func PAwait3[OutT1, OutT2, OutT3 any](ac1 AsyncCall[OutT1], ac2 AsyncCall[OutT2] func PAwait3Compiled[OutT1, OutT2, OutT3 any]( ac1 AsyncCall[OutT1], ac2 AsyncCall[OutT2], ac3 AsyncCall[OutT3]) *PromiseImpl[Await3Result[OutT1, OutT2, OutT3]] { P := &PromiseImpl[Await3Result[OutT1, OutT2, OutT3]]{} - P.Debug = "Parallel3" + P.Debug = "PAwait3" P.Func = func(resolve func(Await3Result[OutT1, OutT2, OutT3], error)) { ret := Await3Result[OutT1, OutT2, OutT3]{} wg := sync.WaitGroup{} diff --git a/x/io/io.go b/x/io/io.go index 86ebaef1..b82c4fcf 100644 --- a/x/io/io.go +++ b/x/io/io.go @@ -80,7 +80,7 @@ func Run[OutT any](ac AsyncCall[OutT]) (OutT, error) { e.acs = e.acs[1:] ac.Call() if ac.Done() && ac == rootAc { - return p.Value, p.Err + return p.value, p.err } } } @@ -91,32 +91,40 @@ type Promise[OutT any] func(OutT, error) // llgo:link Promise.Await llgo.await func (p Promise[OutT]) Await(timeout ...time.Duration) (ret OutT, err error) { - return + panic("should not called") } func (p Promise[OutT]) Call() { - + panic("should not called") } func (p Promise[OutT]) Chan() <-chan OutT { - return nil + panic("should not called") } func (p Promise[OutT]) Done() bool { - return false + panic("should not called") +} + +func (p Promise[OutT]) Err() error { + panic("should not called") +} + +func (p Promise[OutT]) Value() OutT { + panic("should not called") } // ----------------------------------------------------------------------------- type PromiseImpl[TOut any] struct { + Debug string Next int Exec *executor Parent asyncCall - Debug string Func func(resolve func(TOut, error)) - Err error - Value TOut + err error + value TOut c chan TOut } @@ -137,23 +145,31 @@ func (p *PromiseImpl[TOut]) Done() bool { func (p *PromiseImpl[TOut]) Call() { p.Func(func(v TOut, err error) { + p.value = v + p.err = err if debugAsync { log.Printf("Resolve task: %+v, %+v, %+v\n", p, v, err) } - p.Value = v - p.Err = err if p.Parent != nil { p.Parent.Resume() } }) } +func (p *PromiseImpl[TOut]) Err() error { + return p.err +} + +func (p *PromiseImpl[TOut]) Value() TOut { + return p.value +} + func (p *PromiseImpl[TOut]) Chan() <-chan TOut { if p.c == nil { p.c = make(chan TOut, 1) p.Func(func(v TOut, err error) { - p.Value = v - p.Err = err + p.value = v + p.err = err p.c <- v }) } @@ -163,5 +179,3 @@ func (p *PromiseImpl[TOut]) Chan() <-chan TOut { func (p *PromiseImpl[TOut]) Await(timeout ...time.Duration) (ret TOut, err error) { panic("should not called") } - -// -----------------------------------------------------------------------------