diff --git a/x/io/_demo/asyncdemo/async.go b/x/io/_demo/asyncdemo/async.go index 519f74fb..2fdbfa85 100644 --- a/x/io/_demo/asyncdemo/async.go +++ b/x/io/_demo/asyncdemo/async.go @@ -15,15 +15,11 @@ import ( type Response struct { StatusCode int - mockBody string -} - -func (r *Response) mock(body string) { - r.mockBody = body + Body string } func (r *Response) Text() (resolve io.Promise[string]) { - resolve(r.mockBody, nil) + resolve(r.Body, nil) return } @@ -32,10 +28,10 @@ func (r *Response) TextCompiled() *io.PromiseImpl[string] { P.Debug = "Text" P.Func = func(resolve func(string, error)) { for { - switch P.Prev = P.Next; P.Prev { + switch P.Next { case 0: P.Next = -1 - resolve(r.mockBody, nil) + resolve(r.Body, nil) return default: panic("Promise already done") @@ -55,7 +51,7 @@ func Http(method string, url string, callback func(resp *Response, err error)) { body = "99.5" } time.Sleep(200 * time.Millisecond) - resp := &Response{StatusCode: 200, mockBody: body} + resp := &Response{StatusCode: 200, Body: body} callback(resp, nil) }() } @@ -70,7 +66,7 @@ func AsyncHttpGetCompiled(url string) *io.PromiseImpl[*Response] { P.Debug = "HttpGet" P.Func = func(resolve func(*Response, error)) { for { - switch P.Prev = P.Next; P.Prev { + switch P.Next { case 0: P.Next = -1 Http("GET", url, resolve) @@ -93,7 +89,7 @@ func AsyncHttpPostCompiled(url string) *io.PromiseImpl[*Response] { P.Debug = "HttpPost" P.Func = func(resolve func(*Response, error)) { for { - switch P.Prev = P.Next; P.Prev { + switch P.Next { case 0: P.Next = -1 Http("POST", url, resolve) @@ -147,18 +143,15 @@ func GetUserCompiled(name string) *io.PromiseImpl[User] { P.Debug = "GetUser" P.Func = func(resolve func(User, error)) { for { - switch P.Prev = P.Next; P.Prev { + switch P.Next { case 0: P.Next = 1 state1 = AsyncHttpGetCompiled("http://example.com/user/" + name) state1.Exec = P.Exec + state1.Parent = P state1.Call() return case 1: - if !state1.Done() { - state1.Call() - return - } P.Next = 2 resp, err := state1.Value, state1.Err log.Printf("resp: %v, err: %v\n", resp, err) @@ -174,14 +167,11 @@ func GetUserCompiled(name string) *io.PromiseImpl[User] { state2 = resp.TextCompiled() state2.Exec = P.Exec + state2.Parent = P state2.Call() log.Printf("TextCompiled state2: %v\n", state2) return case 2: - if !state2.Done() { - state2.Call() - return - } P.Next = -1 body, err := state2.Value, state2.Err if err != nil { @@ -240,18 +230,15 @@ func GetScoreCompiled() *io.PromiseImpl[float64] { P.Debug = "GetScore" P.Func = func(resolve func(float64, error)) { for { - switch P.Prev = P.Next; P.Prev { + switch P.Next { case 0: P.Next = 1 state1 = AsyncHttpGetCompiled("http://example.com/score/") state1.Exec = P.Exec + state1.Parent = P state1.Call() return case 1: - if !state1.Done() { - state1.Call() - return - } P.Next = 2 resp, err := state1.Value, state1.Err if err != nil { @@ -266,14 +253,11 @@ func GetScoreCompiled() *io.PromiseImpl[float64] { state2 = resp.TextCompiled() state2.Exec = P.Exec + state2.Parent = P state2.Call() return case 2: - if !state2.Done() { - state2.Call() - return - } P.Next = -1 body, err := state2.Value, state2.Err if err != nil { @@ -319,18 +303,15 @@ func DoUpdateCompiled(op string) *io.PromiseImpl[io.Void] { P.Debug = "DoUpdate" P.Func = func(resolve func(io.Void, error)) { for { - switch P.Prev = P.Next; P.Prev { + switch P.Next { case 0: P.Next = 1 state1 = AsyncHttpPostCompiled("http://example.com/update/" + op) state1.Exec = P.Exec + state1.Parent = P state1.Call() return case 1: - if !state1.Done() { - state1.Call() - return - } P.Next = -1 resp, err := state1.Value, state1.Err if err != nil { @@ -388,18 +369,15 @@ func DemoCompiled() *io.PromiseImpl[io.Void] { P.Debug = "Demo" P.Func = func(resolve func(io.Void, error)) { for { - switch P.Prev = P.Next; P.Prev { + switch P.Next { case 0: P.Next = 1 state1 = GetUserCompiled("1") state1.Exec = P.Exec + state1.Parent = P state1.Call() return case 1: - if !state1.Done() { - state1.Call() - return - } P.Next = 2 user, err := state1.Value, state1.Err log.Printf("user: %v, err: %v\n", user, err) @@ -407,14 +385,10 @@ func DemoCompiled() *io.PromiseImpl[io.Void] { state2 = io.Race[User](GetUserCompiled("2"), GetUserCompiled("3"), GetUserCompiled("4")) log.Printf("state2: %v\n", state2) state2.Exec = P.Exec + state2.Parent = P state2.Call() return case 2: - - if !state2.Done() { - state2.Call() - return - } log.Printf("case 2, state2: %+v\n", state2) P.Next = 3 @@ -424,13 +398,10 @@ func DemoCompiled() *io.PromiseImpl[io.Void] { state3 = io.All[User]([]io.AsyncCall[User]{GetUserCompiled("5"), GetUserCompiled("6"), GetUserCompiled("7")}) log.Printf("state3: %v\n", state3) state3.Exec = P.Exec + state3.Parent = P state3.Call() return case 3: - if !state3.Done() { - state3.Call() - return - } P.Next = 4 users, err := state3.Value, state3.Err @@ -439,13 +410,10 @@ func DemoCompiled() *io.PromiseImpl[io.Void] { state4 = io.Await3Compiled[User, float64, io.Void](GetUserCompiled("8"), GetScoreCompiled(), DoUpdateCompiled("update sth.")) log.Printf("state4: %v\n", state4) state4.Exec = P.Exec + state4.Parent = P state4.Call() return case 4: - if !state4.Done() { - state4.Call() - return - } P.Next = -1 user, score, _, err := state4.Value.V1, state4.Value.V2, state4.Value.V3, state4.Value.Err diff --git a/x/io/extra.go b/x/io/extra.go index afd0d72c..cd09f66a 100644 --- a/x/io/extra.go +++ b/x/io/extra.go @@ -60,7 +60,7 @@ func Race[OutT any](acs ...AsyncCall[OutT]) *PromiseImpl[OutT] { for _, ac := range acs { ac := ac go func(ac AsyncCall[OutT]) { - v, err := Run(ac) + v, err := Run[OutT](ac) rc <- Result[OutT]{v, err} }(ac) } @@ -94,7 +94,7 @@ func All[OutT any](acs []AsyncCall[OutT]) *PromiseImpl[[]Result[OutT]] { ac := ac wg.Add(1) go func(ac AsyncCall[OutT]) { - v, err := Run(ac) + v, err := Run[OutT](ac) ret[idx] = Result[OutT]{v, err} wg.Done() }(ac) @@ -157,15 +157,15 @@ func Await3Compiled[OutT1, OutT2, OutT3 any]( go func() { defer wg.Done() - ret.V1, ret.Err = Run(ac1) + ret.V1, ret.Err = Run[OutT1](ac1) }() go func() { defer wg.Done() - ret.V2, ret.Err = Run(ac2) + ret.V2, ret.Err = Run[OutT2](ac2) }() go func() { defer wg.Done() - ret.V3, ret.Err = Run(ac3) + ret.V3, ret.Err = Run[OutT3](ac3) }() wg.Wait() if debugAsync { diff --git a/x/io/io.go b/x/io/io.go index f116ad93..736e5306 100644 --- a/x/io/io.go +++ b/x/io/io.go @@ -35,23 +35,19 @@ type Void = [0]byte // ----------------------------------------------------------------------------- type asyncCall interface { + parent() asyncCall Resume() Call() Done() bool } type AsyncCall[OutT any] interface { - Call() - Await(timeout ...time.Duration) (ret OutT, err error) - Chan() <-chan OutT - Done() bool } type executor struct { - ac asyncCall + acs []asyncCall mu sync.Mutex cond *sync.Cond - susp bool } func newExecutor() *executor { @@ -60,10 +56,10 @@ func newExecutor() *executor { return e } -func (e *executor) Resume() { +func (e *executor) schedule(ac asyncCall) { e.mu.Lock() - defer e.mu.Unlock() - e.susp = false + e.acs = append(e.acs, ac) + e.mu.Unlock() e.cond.Signal() } @@ -71,18 +67,28 @@ func Run[OutT any](ac AsyncCall[OutT]) (OutT, error) { e := newExecutor() p := ac.(*PromiseImpl[OutT]) p.Exec = e + var rootAc asyncCall = p + e.schedule(rootAc) for { e.mu.Lock() - for e.susp { + for len(e.acs) == 0 { e.cond.Wait() } e.mu.Unlock() - e.susp = true + ac := e.acs[0] + e.acs = e.acs[1:] if ac.Done() { - return p.Value, p.Err + if ac == rootAc { + return p.Value, p.Err + } + parent := ac.parent() + if parent != nil { + parent.Resume() + } + } else { + ac.Call() } - ac.Call() } } @@ -110,10 +116,10 @@ func (p Promise[OutT]) Done() bool { // ----------------------------------------------------------------------------- type PromiseImpl[TOut any] struct { - Prev int - Next int - Exec *executor - Debug string + Next int + Exec *executor + Parent asyncCall + Debug string Func func(resolve func(TOut, error)) Err error @@ -121,8 +127,15 @@ type PromiseImpl[TOut any] struct { c chan TOut } +func (p *PromiseImpl[TOut]) parent() asyncCall { + return p.Parent +} + func (p *PromiseImpl[TOut]) Resume() { - p.Exec.Resume() + if debugAsync { + log.Printf("Resume task: %+v\n", p) + } + p.Exec.schedule(p) } func (p *PromiseImpl[TOut]) Done() bool {