From d4a72bf661731f616cfd52acb43c01911ad80643 Mon Sep 17 00:00:00 2001 From: Li Jie Date: Tue, 3 Sep 2024 15:58:34 +0800 Subject: [PATCH] async.Run/Await/Race/All --- x/async/_demo/monad/monad.go | 139 +++++++++++++++++++++++ x/async/async.go | 145 ++++++++++++++++++++++++ x/async/executor.go | 59 ++++++++++ x/cbind/cbind.go | 87 +++++++++++++++ x/io/io.go | 210 +++++++++++++++-------------------- x/tuple/tuple.go | 40 +++++++ 6 files changed, 557 insertions(+), 123 deletions(-) create mode 100644 x/async/_demo/monad/monad.go create mode 100644 x/async/async.go create mode 100644 x/async/executor.go create mode 100644 x/cbind/cbind.go create mode 100644 x/tuple/tuple.go diff --git a/x/async/_demo/monad/monad.go b/x/async/_demo/monad/monad.go new file mode 100644 index 00000000..336e91ea --- /dev/null +++ b/x/async/_demo/monad/monad.go @@ -0,0 +1,139 @@ +package main + +import ( + "fmt" + "os" + "time" + + "github.com/goplus/llgo/c" + "github.com/goplus/llgo/x/async" + "github.com/goplus/llgo/x/async/timeout" + "github.com/goplus/llgo/x/tuple" +) + +func ReadFile(fileName string) async.IO[tuple.Tuple2[[]byte, error]] { + return async.Async(func(resolve func(tuple.Tuple2[[]byte, error])) { + go func() { + bytes, err := os.ReadFile(fileName) + resolve(tuple.T2(bytes, err)) + }() + }) +} + +func WriteFile(fileName string, content []byte) async.IO[error] { + return async.Async(func(resolve func(error)) { + go func() { + err := os.WriteFile(fileName, content, 0644) + resolve(err) + }() + }) +} + +func sleep(i int, d time.Duration) async.IO[int] { + return async.Async(func(resolve func(int)) { + go func() { + c.Usleep(c.Uint(d.Microseconds())) + resolve(i) + }() + }) +} + +func main() { + RunIO() + RunAllAndRace() + RunTimeout() + RunSocket() +} + +func RunIO() { + async.Run(func() { + content, err := async.Await(ReadFile("1.txt")).Get() + if err != nil { + fmt.Printf("read err: %v\n", err) + return + } + fmt.Printf("read content: %s\n", content) + err = async.Await(WriteFile("2.txt", content)) + if err != nil { + fmt.Printf("write err: %v\n", err) + return + } + fmt.Printf("write done\n") + }) + + // Translated to in Go+: + + async.Run(func() { + async.BindIO(ReadFile("1.txt"), func(v tuple.Tuple2[[]byte, error]) { + content, err := v.Get() + if err != nil { + fmt.Printf("read err: %v\n", err) + return + } + fmt.Printf("read content: %s\n", content) + async.BindIO(WriteFile("2.txt", content), func(v error) { + err = v + if err != nil { + fmt.Printf("write err: %v\n", err) + return + } + fmt.Printf("write done\n") + }) + }) + }) +} + +func RunAllAndRace() { + async.Run(func() { + all := async.All(sleep(1, time.Second), sleep(2, time.Second*2), sleep(3, time.Second*3)) + async.BindIO(all, func(v []int) { + fmt.Printf("All: %v\n", v) + }) + }) + + async.Run(func() { + first := async.Race(sleep(1, time.Second), sleep(2, time.Second*2), sleep(3, time.Second*3)) + v := async.Await(first) + fmt.Printf("Race: %v\n", v) + }) + + // Translated to in Go+: + + async.Run(func() { + all := async.All(sleep(1, time.Second), sleep(2, time.Second*2), sleep(3, time.Second*3)) + async.BindIO(all, func(v []int) { + fmt.Printf("All: %v\n", v) + }) + }) + + async.Run(func() { + first := async.Race(sleep(1, time.Second), sleep(2, time.Second*2), sleep(3, time.Second*3)) + async.BindIO(first, func(v int) { + fmt.Printf("Race: %v\n", v) + }) + }) +} + +func RunTimeout() { + async.Run(func() { + fmt.Printf("Start 1 second timeout\n") + async.Await(timeout.Timeout(1 * time.Second)) + fmt.Printf("timeout\n") + }) + + // Translated to in Go+: + + async.Run(func() { + fmt.Printf("Start 1 second timeout\n") + async.BindIO(timeout.Timeout(1*time.Second), func(async.Void) { + fmt.Printf("timeout\n") + }) + }) +} + +func RunSocket() { + // async.Run(func() { + // tcp := io.NewTcp() + // tcp. + // }) +} diff --git a/x/async/async.go b/x/async/async.go new file mode 100644 index 00000000..cdf1d43a --- /dev/null +++ b/x/async/async.go @@ -0,0 +1,145 @@ +/* + * Copyright (c) 2024 The GoPlus Authors (goplus.org). All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package async + +import ( + "context" + "unsafe" + _ "unsafe" + + "github.com/goplus/llgo/c/libuv" +) + +type Void = [0]byte + +type Future[T any] func() T + +type IO[T any] func(e *AsyncContext) Future[T] + +type Chain[T any] func(callback func(T)) + +func (f Future[T]) Do(callback func(T)) { + callback(f()) +} + +type AsyncContext struct { + context.Context + *Executor + Complete func() +} + +func Async[T any](fn func(resolve func(T))) IO[T] { + return func(ctx *AsyncContext) Future[T] { + var result T + var done bool + fn(func(t T) { + result = t + done = true + ctx.Complete() + }) + return func() T { + if !done { + panic("AsyncIO: Future accessed before completion") + } + return result + } + } +} + +type bindAsync struct { + libuv.Async + cb func() +} + +func BindIO[T any](call IO[T], callback func(T)) { + loop := Exec().L + a := &bindAsync{} + loop.Async(&a.Async, func(p *libuv.Async) { + (*bindAsync)(unsafe.Pointer(p)).cb() + }) + ctx := &AsyncContext{ + Context: context.Background(), + Executor: Exec(), + Complete: func() { + a.Async.Send() + }, + } + f := call(ctx) + a.cb = func() { + a.Async.Close(nil) + result := f() + callback(result) + } +} + +// ----------------------------------------------------------------------------- + +func Await[T1 any](call IO[T1]) (ret T1) { + ch := make(chan struct{}) + f := call(&AsyncContext{ + Context: context.Background(), + Executor: Exec(), + Complete: func() { + close(ch) + }, + }) + <-ch + return f() +} + +func Race[T1 any](calls ...IO[T1]) IO[T1] { + return Async(func(resolve func(T1)) { + done := false + for _, call := range calls { + var f Future[T1] + f = call(&AsyncContext{ + Context: context.Background(), + Executor: Exec(), + Complete: func() { + if done { + return + } + done = true + resolve(f()) + }, + }) + } + }) +} + +func All[T1 any](calls ...IO[T1]) IO[[]T1] { + return Async(func(resolve func([]T1)) { + n := len(calls) + results := make([]T1, n) + done := 0 + for i, call := range calls { + i := i + var f Future[T1] + f = call(&AsyncContext{ + Context: context.Background(), + Executor: Exec(), + Complete: func() { + results[i] = f() + done++ + if done == n { + resolve(results) + } + }, + }) + } + }) +} diff --git a/x/async/executor.go b/x/async/executor.go new file mode 100644 index 00000000..3306b541 --- /dev/null +++ b/x/async/executor.go @@ -0,0 +1,59 @@ +/* + * Copyright (c) 2024 The GoPlus Authors (goplus.org). All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package async + +import ( + "unsafe" + + "github.com/goplus/llgo/c/libuv" + "github.com/goplus/llgo/c/pthread" +) + +var execKey pthread.Key + +func init() { + execKey.Create(nil) +} + +type Executor struct { + L *libuv.Loop +} + +func Exec() *Executor { + v := execKey.Get() + if v == nil { + panic("async.Exec: no executor") + } + return (*Executor)(v) +} + +func setExec(e *Executor) { + execKey.Set(unsafe.Pointer(e)) +} + +func (e *Executor) Run() { + e.L.Run(libuv.RUN_DEFAULT) +} + +func Run(fn func()) { + loop := libuv.LoopNew() + exec := &Executor{loop} + setExec(exec) + fn() + exec.Run() + loop.Close() +} diff --git a/x/cbind/cbind.go b/x/cbind/cbind.go new file mode 100644 index 00000000..60e6d24a --- /dev/null +++ b/x/cbind/cbind.go @@ -0,0 +1,87 @@ +/* + * Copyright (c) 2024 The GoPlus Authors (goplus.org). All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cbind + +import "unsafe" + +type bind[Base any] struct { + b Base + fn func() +} + +type bind1[Base any, A any] struct { + b Base + fn func(A) +} + +type bind2[Base any, A any, B any] struct { + b Base + fn func(A, B) +} + +type bind3[Base any, A any, B any, C any] struct { + b Base + fn func(A, B, C) +} + +func callback[Base any](b *Base) { + bind := (*bind[Base])(unsafe.Pointer(b)) + bind.fn() +} + +func callback1[Base any, A any](b *Base, a A) { + bind := (*bind1[Base, A])(unsafe.Pointer(b)) + bind.fn(a) +} + +func callback2[Base any, A any, B any](b *Base, a A, c B) { + bind := (*bind2[Base, A, B])(unsafe.Pointer(b)) + bind.fn(a, c) +} + +func callback3[Base any, A any, B any, C any](b *Base, a A, c B, d C) { + bind := (*bind3[Base, A, B, C])(unsafe.Pointer(b)) + bind.fn(a, c, d) +} + +func Bind[T any](call func()) (p *T, fn func(*T)) { + bb := &bind[T]{fn: func() { call() }} + p = (*T)(unsafe.Pointer(bb)) + fn = callback[T] + return +} + +func Bind1[T any, A any](call func(A)) (p *T, fn func(*T, A)) { + bb := &bind1[T, A]{fn: func(a A) { call(a) }} + p = (*T)(unsafe.Pointer(bb)) + fn = callback1[T, A] + return +} + +func Bind2[T any, A any, B any](call func(A, B)) (p *T, fn func(*T, A, B)) { + bb := &bind2[T, A, B]{fn: func(a A, b B) { call(a, b) }} + p = (*T)(unsafe.Pointer(bb)) + fn = callback2[T, A, B] + return +} + +func Bind3[T any, A any, B any, C any](call func(A, B, C), a A, b B, c C) (p *T, fn func(*T, A, B, C)) { + bb := &bind3[T, A, B, C]{fn: func(a A, b B, c C) { call(a, b, c) }} + p = (*T)(unsafe.Pointer(bb)) + fn = callback3[T, A, B, C] + return +} diff --git a/x/io/io.go b/x/io/io.go index f4b4c18b..053da3bf 100644 --- a/x/io/io.go +++ b/x/io/io.go @@ -17,154 +17,118 @@ package io import ( + "unsafe" _ "unsafe" - "time" + "github.com/goplus/llgo/c" + "github.com/goplus/llgo/c/libuv" + "github.com/goplus/llgo/c/net" + "github.com/goplus/llgo/x/async" + "github.com/goplus/llgo/x/cbind" + "github.com/goplus/llgo/x/tuple" ) -const ( - LLGoPackage = "decl" -) - -type Void = [0]byte - -// ----------------------------------------------------------------------------- - -type AsyncCall[OutT any] interface { - Await(timeout ...time.Duration) (ret OutT, err error) - Chan() <-chan OutT - EnsureDone() +type Tcp struct { + tcp *libuv.Tcp } -// llgo:link AsyncCall.Await llgo.await -func Await[OutT any](call AsyncCall[OutT], timeout ...time.Duration) (ret OutT, err error) { - return +type libuvError libuv.Errno + +func (e libuvError) Error() string { + s := libuv.Strerror(libuv.Errno(e)) + return c.GoString(s, c.Strlen(s)) } -//go:linkname Timeout llgo.timeout -func Timeout(time.Duration) (ret AsyncCall[Void]) +func NewTcp() *Tcp { + t := &Tcp{&libuv.Tcp{}} + libuv.InitTcp(async.Exec().L, t.tcp) + return t +} -func TimeoutCompiled(d time.Duration) *PromiseImpl[Void] { - P := &PromiseImpl[Void]{} - P.Func = func(resolve func(Void, error)) { - go func() { - time.Sleep(d) - resolve(Void{}, nil) - }() +func (t *Tcp) Bind(addr *net.SockAddr, flags uint) error { + if res := t.tcp.Bind(addr, c.Uint(flags)); res != 0 { + return libuvError(res) } - return P -} - -// llgo:link Race llgo.race -func Race[OutT any](acs ...AsyncCall[OutT]) (ret *PromiseImpl[OutT]) { - return -} - -func All[OutT any](acs []AsyncCall[OutT]) (ret *PromiseImpl[[]OutT]) { return nil } -// llgo:link Await2 llgo.await -func Await2[OutT1, OutT2 any]( - ac1 AsyncCall[OutT1], ac2 AsyncCall[OutT2], - timeout ...time.Duration) (ret1 OutT1, ret2 OutT2, err error) { - return -} - -type Await2Result[T1 any, T2 any] struct { - V1 T1 - V2 T2 - Err error -} - -func Await2Compiled[OutT1, OutT2 any]( - ac1 AsyncCall[OutT1], ac2 AsyncCall[OutT2], - timeout ...time.Duration) (ret *PromiseImpl[Await2Result[OutT1, OutT2]]) { - return -} - -// llgo:link Await3 llgo.await -func Await3[OutT1, OutT2, OutT3 any]( - ac1 AsyncCall[OutT1], ac2 AsyncCall[OutT2], ac3 AsyncCall[OutT3], - timeout ...time.Duration) (ret1 OutT1, ret2 OutT2, ret3 OutT3, err error) { - return -} - -type Await3Result[T1 any, T2 any, T3 any] struct { - V1 T1 - V2 T2 - V3 T3 - Err error -} - -func Await3Compiled[OutT1, OutT2, OutT3 any]( - ac1 AsyncCall[OutT1], ac2 AsyncCall[OutT2], ac3 AsyncCall[OutT3], - timeout ...time.Duration) (ret *PromiseImpl[Await3Result[OutT1, OutT2, OutT3]]) { - return -} - -func Run(ac AsyncCall[Void]) { - p := ac.(*PromiseImpl[Void]) - p.Resume() - <-ac.Chan() -} - -// ----------------------------------------------------------------------------- - -type Promise[OutT any] func(OutT, error) - -// llgo:link Promise.Await llgo.await -func (p Promise[OutT]) Await(timeout ...time.Duration) (ret OutT, err error) { - return -} - -func (p Promise[OutT]) Chan() <-chan OutT { +func (t *Tcp) Listen(backlog int, cb libuv.ConnectionCb) error { + if res := (*libuv.Stream)(t.tcp).Listen(c.Int(backlog), cb); res != 0 { + return libuvError(res) + } return nil } -func (p Promise[OutT]) EnsureDone() { - +func (t *Tcp) Accept() (client *Tcp, err error) { + tcp := &libuv.Tcp{} + if res := libuv.InitTcp(async.Exec().L, tcp); res != 0 { + return nil, libuvError(res) + } + if res := (*libuv.Stream)(t.tcp).Accept((*libuv.Stream)(client.tcp)); res != 0 { + return nil, libuvError(res) + } + return &Tcp{tcp}, nil } -// ----------------------------------------------------------------------------- - -type PromiseImpl[TOut any] struct { - Func func(resolve func(TOut, error)) - Value TOut - Err error - Prev int - Next int - - c chan TOut -} - -func (p *PromiseImpl[TOut]) Resume() { - p.Func(func(v TOut, err error) { - p.Value = v - p.Err = err +func Connect(addr *net.SockAddr) async.IO[tuple.Tuple2[*Tcp, error]] { + return async.Async(func(resolve func(tuple.Tuple2[*Tcp, error])) { + tcp := &libuv.Tcp{} + if res := libuv.InitTcp(async.Exec().L, tcp); res != 0 { + resolve(tuple.T2[*Tcp, error]((*Tcp)(nil), libuvError(res))) + return + } + req, cb := cbind.Bind1[libuv.Connect](func(status c.Int) { + if status != 0 { + resolve(tuple.T2[*Tcp, error]((*Tcp)(nil), libuvError(libuv.Errno(status)))) + } else { + resolve(tuple.T2[*Tcp, error](&Tcp{tcp}, nil)) + } + }) + if res := libuv.TcpConnect(req, tcp, addr, cb); res != 0 { + resolve(tuple.T2[*Tcp, error]((*Tcp)(nil), libuvError(res))) + } }) } -func (p *PromiseImpl[TOut]) EnsureDone() { - if p.Next == -1 { - panic("Promise already done") - } +func allocBuffer(handle *libuv.Handle, suggestedSize uintptr, buf *libuv.Buf) { + buf.Base = (*c.Char)(c.Malloc(suggestedSize)) + buf.Len = suggestedSize } -func (p *PromiseImpl[TOut]) Chan() <-chan TOut { - if p.c == nil { - p.c = make(chan TOut, 1) - p.Func(func(v TOut, err error) { - p.Value = v - p.Err = err - p.c <- v +type slice struct { + data unsafe.Pointer + len int +} + +func goBytes(buf *int8, n int) []byte { + return *(*[]byte)(unsafe.Pointer(&slice{unsafe.Pointer(buf), n})) +} + +func (t *Tcp) Read() async.IO[tuple.Tuple2[[]byte, error]] { + return func(ctx *async.AsyncContext) async.Future[tuple.Tuple2[[]byte, error]] { + var result tuple.Tuple2[[]byte, error] + var done bool + tcp := (*libuv.Stream)(t.tcp) + libuv.ReadStart(tcp, allocBuffer, func(client *libuv.Stream, nread c.Long, buf *libuv.Buf) { + if nread > 0 { + result = tuple.T2[[]byte, error](goBytes(buf.Base, int(nread)), nil) + } else if nread < 0 { + result = tuple.T2[[]byte, error](nil, libuvError(libuv.Errno(nread))) + } else { + result = tuple.T2[[]byte, error](nil, nil) + } + done = true + ctx.Complete() }) + return func() tuple.Tuple2[[]byte, error] { + if !done { + panic("Tcp.Read: Future accessed before completion") + } + return result + } } - return p.c } -func (p *PromiseImpl[TOut]) Await(timeout ...time.Duration) (ret TOut, err error) { - panic("should not called") +func (t *Tcp) Close(cb libuv.CloseCb) { + (*libuv.Handle)(unsafe.Pointer(t.tcp)).Close(cb) } - -// ----------------------------------------------------------------------------- diff --git a/x/tuple/tuple.go b/x/tuple/tuple.go new file mode 100644 index 00000000..8613cc10 --- /dev/null +++ b/x/tuple/tuple.go @@ -0,0 +1,40 @@ +package tuple + +type Tuple[T any] struct { + V T +} + +func T[T any](v T) Tuple[T] { + return Tuple[T]{V: v} +} + +func (t Tuple[T]) Get() T { + return t.V +} + +type Tuple2[T1 any, T2 any] struct { + V1 T1 + V2 T2 +} + +func T2[T1 any, T2 any](v1 T1, v2 T2) Tuple2[T1, T2] { + return Tuple2[T1, T2]{V1: v1, V2: v2} +} + +func (t Tuple2[T1, T2]) Get() (T1, T2) { + return t.V1, t.V2 +} + +type Tuple3[T1 any, T2 any, T3 any] struct { + V1 T1 + V2 T2 + V3 T3 +} + +func T3[T1 any, T2 any, T3 any](v1 T1, v2 T2, v3 T3) Tuple3[T1, T2, T3] { + return Tuple3[T1, T2, T3]{V1: v1, V2: v2, V3: v3} +} + +func (t Tuple3[T1, T2, T3]) Get() (T1, T2, T3) { + return t.V1, t.V2, t.V3 +}