asyncio: improve performance by scheduling last frame

This commit is contained in:
Li Jie
2024-07-25 10:49:02 +08:00
parent f2e15a6846
commit a4286dbd4b
3 changed files with 56 additions and 75 deletions

View File

@@ -15,15 +15,11 @@ import (
type Response struct { type Response struct {
StatusCode int StatusCode int
mockBody string Body string
}
func (r *Response) mock(body string) {
r.mockBody = body
} }
func (r *Response) Text() (resolve io.Promise[string]) { func (r *Response) Text() (resolve io.Promise[string]) {
resolve(r.mockBody, nil) resolve(r.Body, nil)
return return
} }
@@ -32,10 +28,10 @@ func (r *Response) TextCompiled() *io.PromiseImpl[string] {
P.Debug = "Text" P.Debug = "Text"
P.Func = func(resolve func(string, error)) { P.Func = func(resolve func(string, error)) {
for { for {
switch P.Prev = P.Next; P.Prev { switch P.Next {
case 0: case 0:
P.Next = -1 P.Next = -1
resolve(r.mockBody, nil) resolve(r.Body, nil)
return return
default: default:
panic("Promise already done") panic("Promise already done")
@@ -55,7 +51,7 @@ func Http(method string, url string, callback func(resp *Response, err error)) {
body = "99.5" body = "99.5"
} }
time.Sleep(200 * time.Millisecond) time.Sleep(200 * time.Millisecond)
resp := &Response{StatusCode: 200, mockBody: body} resp := &Response{StatusCode: 200, Body: body}
callback(resp, nil) callback(resp, nil)
}() }()
} }
@@ -70,7 +66,7 @@ func AsyncHttpGetCompiled(url string) *io.PromiseImpl[*Response] {
P.Debug = "HttpGet" P.Debug = "HttpGet"
P.Func = func(resolve func(*Response, error)) { P.Func = func(resolve func(*Response, error)) {
for { for {
switch P.Prev = P.Next; P.Prev { switch P.Next {
case 0: case 0:
P.Next = -1 P.Next = -1
Http("GET", url, resolve) Http("GET", url, resolve)
@@ -93,7 +89,7 @@ func AsyncHttpPostCompiled(url string) *io.PromiseImpl[*Response] {
P.Debug = "HttpPost" P.Debug = "HttpPost"
P.Func = func(resolve func(*Response, error)) { P.Func = func(resolve func(*Response, error)) {
for { for {
switch P.Prev = P.Next; P.Prev { switch P.Next {
case 0: case 0:
P.Next = -1 P.Next = -1
Http("POST", url, resolve) Http("POST", url, resolve)
@@ -147,18 +143,15 @@ func GetUserCompiled(name string) *io.PromiseImpl[User] {
P.Debug = "GetUser" P.Debug = "GetUser"
P.Func = func(resolve func(User, error)) { P.Func = func(resolve func(User, error)) {
for { for {
switch P.Prev = P.Next; P.Prev { switch P.Next {
case 0: case 0:
P.Next = 1 P.Next = 1
state1 = AsyncHttpGetCompiled("http://example.com/user/" + name) state1 = AsyncHttpGetCompiled("http://example.com/user/" + name)
state1.Exec = P.Exec state1.Exec = P.Exec
state1.Parent = P
state1.Call() state1.Call()
return return
case 1: case 1:
if !state1.Done() {
state1.Call()
return
}
P.Next = 2 P.Next = 2
resp, err := state1.Value, state1.Err resp, err := state1.Value, state1.Err
log.Printf("resp: %v, err: %v\n", resp, err) log.Printf("resp: %v, err: %v\n", resp, err)
@@ -174,14 +167,11 @@ func GetUserCompiled(name string) *io.PromiseImpl[User] {
state2 = resp.TextCompiled() state2 = resp.TextCompiled()
state2.Exec = P.Exec state2.Exec = P.Exec
state2.Parent = P
state2.Call() state2.Call()
log.Printf("TextCompiled state2: %v\n", state2) log.Printf("TextCompiled state2: %v\n", state2)
return return
case 2: case 2:
if !state2.Done() {
state2.Call()
return
}
P.Next = -1 P.Next = -1
body, err := state2.Value, state2.Err body, err := state2.Value, state2.Err
if err != nil { if err != nil {
@@ -240,18 +230,15 @@ func GetScoreCompiled() *io.PromiseImpl[float64] {
P.Debug = "GetScore" P.Debug = "GetScore"
P.Func = func(resolve func(float64, error)) { P.Func = func(resolve func(float64, error)) {
for { for {
switch P.Prev = P.Next; P.Prev { switch P.Next {
case 0: case 0:
P.Next = 1 P.Next = 1
state1 = AsyncHttpGetCompiled("http://example.com/score/") state1 = AsyncHttpGetCompiled("http://example.com/score/")
state1.Exec = P.Exec state1.Exec = P.Exec
state1.Parent = P
state1.Call() state1.Call()
return return
case 1: case 1:
if !state1.Done() {
state1.Call()
return
}
P.Next = 2 P.Next = 2
resp, err := state1.Value, state1.Err resp, err := state1.Value, state1.Err
if err != nil { if err != nil {
@@ -266,14 +253,11 @@ func GetScoreCompiled() *io.PromiseImpl[float64] {
state2 = resp.TextCompiled() state2 = resp.TextCompiled()
state2.Exec = P.Exec state2.Exec = P.Exec
state2.Parent = P
state2.Call() state2.Call()
return return
case 2: case 2:
if !state2.Done() {
state2.Call()
return
}
P.Next = -1 P.Next = -1
body, err := state2.Value, state2.Err body, err := state2.Value, state2.Err
if err != nil { if err != nil {
@@ -319,18 +303,15 @@ func DoUpdateCompiled(op string) *io.PromiseImpl[io.Void] {
P.Debug = "DoUpdate" P.Debug = "DoUpdate"
P.Func = func(resolve func(io.Void, error)) { P.Func = func(resolve func(io.Void, error)) {
for { for {
switch P.Prev = P.Next; P.Prev { switch P.Next {
case 0: case 0:
P.Next = 1 P.Next = 1
state1 = AsyncHttpPostCompiled("http://example.com/update/" + op) state1 = AsyncHttpPostCompiled("http://example.com/update/" + op)
state1.Exec = P.Exec state1.Exec = P.Exec
state1.Parent = P
state1.Call() state1.Call()
return return
case 1: case 1:
if !state1.Done() {
state1.Call()
return
}
P.Next = -1 P.Next = -1
resp, err := state1.Value, state1.Err resp, err := state1.Value, state1.Err
if err != nil { if err != nil {
@@ -388,18 +369,15 @@ func DemoCompiled() *io.PromiseImpl[io.Void] {
P.Debug = "Demo" P.Debug = "Demo"
P.Func = func(resolve func(io.Void, error)) { P.Func = func(resolve func(io.Void, error)) {
for { for {
switch P.Prev = P.Next; P.Prev { switch P.Next {
case 0: case 0:
P.Next = 1 P.Next = 1
state1 = GetUserCompiled("1") state1 = GetUserCompiled("1")
state1.Exec = P.Exec state1.Exec = P.Exec
state1.Parent = P
state1.Call() state1.Call()
return return
case 1: case 1:
if !state1.Done() {
state1.Call()
return
}
P.Next = 2 P.Next = 2
user, err := state1.Value, state1.Err user, err := state1.Value, state1.Err
log.Printf("user: %v, err: %v\n", user, err) log.Printf("user: %v, err: %v\n", user, err)
@@ -407,14 +385,10 @@ func DemoCompiled() *io.PromiseImpl[io.Void] {
state2 = io.Race[User](GetUserCompiled("2"), GetUserCompiled("3"), GetUserCompiled("4")) state2 = io.Race[User](GetUserCompiled("2"), GetUserCompiled("3"), GetUserCompiled("4"))
log.Printf("state2: %v\n", state2) log.Printf("state2: %v\n", state2)
state2.Exec = P.Exec state2.Exec = P.Exec
state2.Parent = P
state2.Call() state2.Call()
return return
case 2: case 2:
if !state2.Done() {
state2.Call()
return
}
log.Printf("case 2, state2: %+v\n", state2) log.Printf("case 2, state2: %+v\n", state2)
P.Next = 3 P.Next = 3
@@ -424,13 +398,10 @@ func DemoCompiled() *io.PromiseImpl[io.Void] {
state3 = io.All[User]([]io.AsyncCall[User]{GetUserCompiled("5"), GetUserCompiled("6"), GetUserCompiled("7")}) state3 = io.All[User]([]io.AsyncCall[User]{GetUserCompiled("5"), GetUserCompiled("6"), GetUserCompiled("7")})
log.Printf("state3: %v\n", state3) log.Printf("state3: %v\n", state3)
state3.Exec = P.Exec state3.Exec = P.Exec
state3.Parent = P
state3.Call() state3.Call()
return return
case 3: case 3:
if !state3.Done() {
state3.Call()
return
}
P.Next = 4 P.Next = 4
users, err := state3.Value, state3.Err users, err := state3.Value, state3.Err
@@ -439,13 +410,10 @@ func DemoCompiled() *io.PromiseImpl[io.Void] {
state4 = io.Await3Compiled[User, float64, io.Void](GetUserCompiled("8"), GetScoreCompiled(), DoUpdateCompiled("update sth.")) state4 = io.Await3Compiled[User, float64, io.Void](GetUserCompiled("8"), GetScoreCompiled(), DoUpdateCompiled("update sth."))
log.Printf("state4: %v\n", state4) log.Printf("state4: %v\n", state4)
state4.Exec = P.Exec state4.Exec = P.Exec
state4.Parent = P
state4.Call() state4.Call()
return return
case 4: case 4:
if !state4.Done() {
state4.Call()
return
}
P.Next = -1 P.Next = -1
user, score, _, err := state4.Value.V1, state4.Value.V2, state4.Value.V3, state4.Value.Err user, score, _, err := state4.Value.V1, state4.Value.V2, state4.Value.V3, state4.Value.Err

View File

@@ -60,7 +60,7 @@ func Race[OutT any](acs ...AsyncCall[OutT]) *PromiseImpl[OutT] {
for _, ac := range acs { for _, ac := range acs {
ac := ac ac := ac
go func(ac AsyncCall[OutT]) { go func(ac AsyncCall[OutT]) {
v, err := Run(ac) v, err := Run[OutT](ac)
rc <- Result[OutT]{v, err} rc <- Result[OutT]{v, err}
}(ac) }(ac)
} }
@@ -94,7 +94,7 @@ func All[OutT any](acs []AsyncCall[OutT]) *PromiseImpl[[]Result[OutT]] {
ac := ac ac := ac
wg.Add(1) wg.Add(1)
go func(ac AsyncCall[OutT]) { go func(ac AsyncCall[OutT]) {
v, err := Run(ac) v, err := Run[OutT](ac)
ret[idx] = Result[OutT]{v, err} ret[idx] = Result[OutT]{v, err}
wg.Done() wg.Done()
}(ac) }(ac)
@@ -157,15 +157,15 @@ func Await3Compiled[OutT1, OutT2, OutT3 any](
go func() { go func() {
defer wg.Done() defer wg.Done()
ret.V1, ret.Err = Run(ac1) ret.V1, ret.Err = Run[OutT1](ac1)
}() }()
go func() { go func() {
defer wg.Done() defer wg.Done()
ret.V2, ret.Err = Run(ac2) ret.V2, ret.Err = Run[OutT2](ac2)
}() }()
go func() { go func() {
defer wg.Done() defer wg.Done()
ret.V3, ret.Err = Run(ac3) ret.V3, ret.Err = Run[OutT3](ac3)
}() }()
wg.Wait() wg.Wait()
if debugAsync { if debugAsync {

View File

@@ -35,23 +35,19 @@ type Void = [0]byte
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
type asyncCall interface { type asyncCall interface {
parent() asyncCall
Resume() Resume()
Call() Call()
Done() bool Done() bool
} }
type AsyncCall[OutT any] interface { type AsyncCall[OutT any] interface {
Call()
Await(timeout ...time.Duration) (ret OutT, err error)
Chan() <-chan OutT
Done() bool
} }
type executor struct { type executor struct {
ac asyncCall acs []asyncCall
mu sync.Mutex mu sync.Mutex
cond *sync.Cond cond *sync.Cond
susp bool
} }
func newExecutor() *executor { func newExecutor() *executor {
@@ -60,10 +56,10 @@ func newExecutor() *executor {
return e return e
} }
func (e *executor) Resume() { func (e *executor) schedule(ac asyncCall) {
e.mu.Lock() e.mu.Lock()
defer e.mu.Unlock() e.acs = append(e.acs, ac)
e.susp = false e.mu.Unlock()
e.cond.Signal() e.cond.Signal()
} }
@@ -71,18 +67,28 @@ func Run[OutT any](ac AsyncCall[OutT]) (OutT, error) {
e := newExecutor() e := newExecutor()
p := ac.(*PromiseImpl[OutT]) p := ac.(*PromiseImpl[OutT])
p.Exec = e p.Exec = e
var rootAc asyncCall = p
e.schedule(rootAc)
for { for {
e.mu.Lock() e.mu.Lock()
for e.susp { for len(e.acs) == 0 {
e.cond.Wait() e.cond.Wait()
} }
e.mu.Unlock() e.mu.Unlock()
e.susp = true ac := e.acs[0]
e.acs = e.acs[1:]
if ac.Done() { if ac.Done() {
return p.Value, p.Err if ac == rootAc {
return p.Value, p.Err
}
parent := ac.parent()
if parent != nil {
parent.Resume()
}
} else {
ac.Call()
} }
ac.Call()
} }
} }
@@ -110,10 +116,10 @@ func (p Promise[OutT]) Done() bool {
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
type PromiseImpl[TOut any] struct { type PromiseImpl[TOut any] struct {
Prev int Next int
Next int Exec *executor
Exec *executor Parent asyncCall
Debug string Debug string
Func func(resolve func(TOut, error)) Func func(resolve func(TOut, error))
Err error Err error
@@ -121,8 +127,15 @@ type PromiseImpl[TOut any] struct {
c chan TOut c chan TOut
} }
func (p *PromiseImpl[TOut]) parent() asyncCall {
return p.Parent
}
func (p *PromiseImpl[TOut]) Resume() { func (p *PromiseImpl[TOut]) Resume() {
p.Exec.Resume() if debugAsync {
log.Printf("Resume task: %+v\n", p)
}
p.Exec.schedule(p)
} }
func (p *PromiseImpl[TOut]) Done() bool { func (p *PromiseImpl[TOut]) Done() bool {