diff --git a/x/async/_demo/all/all.go b/x/async/_demo/all/all.go new file mode 100644 index 00000000..56e0c4b6 --- /dev/null +++ b/x/async/_demo/all/all.go @@ -0,0 +1,308 @@ +package main + +import ( + "fmt" + "os" + "time" + + "github.com/goplus/llgo/c" + "github.com/goplus/llgo/c/net" + "github.com/goplus/llgo/x/async" + "github.com/goplus/llgo/x/async/timeout" + "github.com/goplus/llgo/x/io" + "github.com/goplus/llgo/x/tuple" +) + +func ReadFile(fileName string) async.Future[tuple.Tuple2[[]byte, error]] { + return async.Async(func(resolve func(tuple.Tuple2[[]byte, error])) { + go func() { + println(async.Gettid(), "read file", fileName) + bytes, err := os.ReadFile(fileName) + resolve(tuple.T2(bytes, err)) + }() + }) +} + +func WriteFile(fileName string, content []byte) async.Future[error] { + return async.Async(func(resolve func(error)) { + go func() { + err := os.WriteFile(fileName, content, 0644) + resolve(err) + }() + }) +} + +func sleep(i int, d time.Duration) async.Future[int] { + return async.Async(func(resolve func(int)) { + timeout.Timeout(d)(func(async.Void) { + resolve(i) + }) + }) +} + +func main() { + RunIO() + RunAllAndRace() + RunTimeout() + RunSocket() +} + +func RunIO() { + println("RunIO with Await") + + async.Run(async.Async(func(resolve func(async.Void)) { + println("read file") + defer resolve(async.Void{}) + content, err := async.Await(ReadFile("1.txt")).Get() + if err != nil { + fmt.Printf("read err: %v\n", err) + return + } + fmt.Printf("read content: %s\n", content) + err = async.Await(WriteFile("2.txt", content)) + if err != nil { + fmt.Printf("write err: %v\n", err) + return + } + fmt.Printf("write done\n") + })) + + // Translated Await to BindIO in Go+: + println("RunIO with BindIO") + + async.Run(async.Async(func(resolve func(async.Void)) { + ReadFile("1.txt")(func(v tuple.Tuple2[[]byte, error]) { + content, err := v.Get() + if err != nil { + fmt.Printf("read err: %v\n", err) + resolve(async.Void{}) + return + } + fmt.Printf("read content: %s\n", content) + WriteFile("2.txt", content)(func(v error) { + err = v + if err != nil { + fmt.Printf("write err: %v\n", err) + resolve(async.Void{}) + return + } + println("write done") + resolve(async.Void{}) + }) + }) + })) +} + +func RunAllAndRace() { + ms100 := 100 * time.Millisecond + ms200 := 200 * time.Millisecond + ms300 := 300 * time.Millisecond + + println("Run All with Await") + + async.Run(async.Async(func(resolve func(async.Void)) { + async.All(sleep(1, ms200), sleep(2, ms100), sleep(3, ms300))(func(v []int) { + fmt.Printf("All: %v\n", v) + resolve(async.Void{}) + }) + })) + + println("Run Race with Await") + + async.Run(async.Async(func(resolve func(async.Void)) { + first := async.Race(sleep(1, ms200), sleep(2, ms100), sleep(3, ms300)) + v := async.Await(first) + fmt.Printf("Race: %v\n", v) + resolve(async.Void{}) + })) + + // Translated to in Go+: + + println("Run All with BindIO") + + async.Run(async.Async(func(resolve func(async.Void)) { + async.All(sleep(1, ms200), sleep(2, ms100), sleep(3, ms300))(func(v []int) { + fmt.Printf("All: %v\n", v) + resolve(async.Void{}) + }) + })) + + println("Run Race with BindIO") + + async.Run(async.Async(func(resolve func(async.Void)) { + async.Race(sleep(1, ms200), sleep(2, ms100), sleep(3, ms300))(func(v int) { + fmt.Printf("Race: %v\n", v) + resolve(async.Void{}) + }) + })) +} + +func RunTimeout() { + println("Run Timeout with Await") + + async.Run(async.Async(func(resolve func(async.Void)) { + fmt.Printf("Start 100 ms timeout\n") + async.Await(timeout.Timeout(100 * time.Millisecond)) + fmt.Printf("timeout\n") + resolve(async.Void{}) + })) + + // Translated to in Go+: + + println("Run Timeout with BindIO") + + async.Run(async.Async(func(resolve func(async.Void)) { + fmt.Printf("Start 100 ms timeout\n") + timeout.Timeout(100 * time.Millisecond)(func(async.Void) { + fmt.Printf("timeout\n") + resolve(async.Void{}) + }) + })) +} + +func RunSocket() { + println("Run Socket") + + async.Run(async.Async(func(resolve func(async.Void)) { + println("RunServer") + + RunServer()(func(async.Void) { + println("RunServer done") + resolve(async.Void{}) + }) + + println("RunClient") + + RunClient()(func(async.Void) { + println("RunClient done") + resolve(async.Void{}) + }) + })) +} + +func RunClient() async.Future[async.Void] { + return async.Async(func(resolve func(async.Void)) { + bindAddr := "127.0.0.1:3927" + io.ParseAddr(bindAddr)(func(v tuple.Tuple2[*net.SockAddr, error]) { + addr, err := v.Get() + println("Connect to", addr, err) + if err != nil { + panic(err) + } + io.Connect(addr)(func(v tuple.Tuple2[*io.Tcp, error]) { + client, err := v.Get() + println("Connected", client, err) + if err != nil { + panic(err) + } + var loop func(client *io.Tcp) + loop = func(client *io.Tcp) { + client.Write([]byte("Hello"))(func(err error) { + if err != nil { + panic(err) + } + client.Read()(func(v tuple.Tuple2[[]byte, error]) { + data, err := v.Get() + if err != nil { + panic(err) + } + println("Read:", string(data)) + timeout.Timeout(1 * time.Second)(func(async.Void) { + loop(client) + }) + }) + }) + } + loop(client) + }) + }) + }) +} + +func RunServer() async.Future[async.Void] { + return async.Async(func(resolve func(async.Void)) { + server, err := io.NewTcp() + if err != nil { + panic(err) + } + + bindAddr := "0.0.0.0:3927" + io.ParseAddr(bindAddr)(func(v tuple.Tuple2[*net.SockAddr, error]) { + addr, err := v.Get() + if err != nil { + panic(err) + } + + if err = server.Bind(addr, 0); err != nil { + panic(err) + } + c.Printf(c.Str("Listening on %s\n"), c.AllocaCStr(bindAddr)) + + err = server.Listen(128, func(server *io.Tcp, err error) { + if err != nil { + panic(err) + } + client, err := server.Accept() + println("Accept", client, err) + + var loop func(client *io.Tcp) + loop = func(client *io.Tcp) { + client.Read()(func(v tuple.Tuple2[[]byte, error]) { + data, err := v.Get() + if err != nil { + println("Read error", err) + } else { + println("Read:", string(data)) + client.Write(data)(func(err error) { + if err != nil { + println("Write error", err) + } else { + println("Write done") + loop(client) + } + }) + } + }) + } + loop(client) + }) + if err != nil { + panic(err) + } + }) + }) +} + +func RunServer1() async.Future[async.Void] { + return async.Async(func(resolve func(async.Void)) { + io.Listen("tcp", "0.0.0.0:3927")(func(v tuple.Tuple2[*io.Tcp, error]) { + server, err := v.Get() + if err != nil { + panic(err) + } + client, err := server.Accept() + println("Accept", client, err) + + var loop func(client *io.Tcp) + loop = func(client *io.Tcp) { + client.Read()(func(v tuple.Tuple2[[]byte, error]) { + data, err := v.Get() + if err != nil { + println("Read error", err) + } else { + println("Read:", string(data)) + client.Write(data)(func(err error) { + if err != nil { + println("Write error", err) + } else { + println("Write done") + loop(client) + } + }) + } + }) + } + loop(client) + }) + }) +} diff --git a/x/async/_demo/monad/monad.go b/x/async/_demo/monad/monad.go deleted file mode 100644 index 4e326932..00000000 --- a/x/async/_demo/monad/monad.go +++ /dev/null @@ -1,156 +0,0 @@ -package main - -import ( - "fmt" - "os" - "time" - - "github.com/goplus/llgo/x/async" - "github.com/goplus/llgo/x/async/timeout" - "github.com/goplus/llgo/x/tuple" -) - -func ReadFile(fileName string) async.IO[tuple.Tuple2[[]byte, error]] { - return async.Async(func(resolve func(tuple.Tuple2[[]byte, error])) { - go func() { - bytes, err := os.ReadFile(fileName) - resolve(tuple.T2(bytes, err)) - }() - }) -} - -func WriteFile(fileName string, content []byte) async.IO[error] { - return async.Async(func(resolve func(error)) { - go func() { - err := os.WriteFile(fileName, content, 0644) - resolve(err) - }() - }) -} - -func sleep(i int, d time.Duration) async.IO[int] { - return async.Async(func(resolve func(int)) { - async.BindIO(timeout.Timeout(d), func(async.Void) { - resolve(i) - }) - }) -} - -func main() { - RunIO() - RunAllAndRace() - RunTimeout() - RunSocket() -} - -func RunIO() { - println("RunIO with Await") - - async.Run(func() { - content, err := async.Await(ReadFile("1.txt")).Get() - if err != nil { - fmt.Printf("read err: %v\n", err) - return - } - fmt.Printf("read content: %s\n", content) - err = async.Await(WriteFile("2.txt", content)) - if err != nil { - fmt.Printf("write err: %v\n", err) - return - } - fmt.Printf("write done\n") - }) - - // Translated to in Go+: - println("RunIO with BindIO") - - async.Run(func() { - async.BindIO(ReadFile("1.txt"), func(v tuple.Tuple2[[]byte, error]) { - content, err := v.Get() - if err != nil { - fmt.Printf("read err: %v\n", err) - return - } - fmt.Printf("read content: %s\n", content) - async.BindIO(WriteFile("2.txt", content), func(v error) { - err = v - if err != nil { - fmt.Printf("write err: %v\n", err) - return - } - fmt.Printf("write done\n") - }) - }) - }) -} - -func RunAllAndRace() { - ms100 := 100 * time.Millisecond - ms200 := 200 * time.Millisecond - ms300 := 300 * time.Millisecond - - println("Run All with Await") - - async.Run(func() { - all := async.All(sleep(1, ms200), sleep(2, ms100), sleep(3, ms300)) - async.BindIO(all, func(v []int) { - fmt.Printf("All: %v\n", v) - }) - }) - - println("Run Race with Await") - - async.Run(func() { - first := async.Race(sleep(1, ms200), sleep(2, ms100), sleep(3, ms300)) - v := async.Await(first) - fmt.Printf("Race: %v\n", v) - }) - - // Translated to in Go+: - - println("Run All with BindIO") - - async.Run(func() { - all := async.All(sleep(1, ms200), sleep(2, ms100), sleep(3, ms300)) - async.BindIO(all, func(v []int) { - fmt.Printf("All: %v\n", v) - }) - }) - - println("Run Race with BindIO") - - async.Run(func() { - first := async.Race(sleep(1, ms200), sleep(2, ms100), sleep(3, ms300)) - async.BindIO(first, func(v int) { - fmt.Printf("Race: %v\n", v) - }) - }) -} - -func RunTimeout() { - println("Run Timeout with Await") - - async.Run(func() { - fmt.Printf("Start 100 ms timeout\n") - async.Await(timeout.Timeout(100 * time.Millisecond)) - fmt.Printf("timeout\n") - }) - - // Translated to in Go+: - - println("Run Timeout with BindIO") - - async.Run(func() { - fmt.Printf("Start 100 ms timeout\n") - async.BindIO(timeout.Timeout(100*time.Millisecond), func(async.Void) { - fmt.Printf("timeout\n") - }) - }) -} - -func RunSocket() { - // async.Run(func() { - // tcp := io.NewTcp() - // tcp. - // }) -} diff --git a/x/async/async.go b/x/async/async.go index 8f7f0d64..b1d4f220 100644 --- a/x/async/async.go +++ b/x/async/async.go @@ -17,38 +17,45 @@ package async import ( + "unsafe" _ "unsafe" + + "github.com/goplus/llgo/c/libuv" ) type Void = [0]byte -type Future[T any] func() T +type Future[T any] func(func(T)) -type IO[T any] func(e *AsyncContext) Future[T] - -type AsyncContext struct { - *Executor - complete func() +type asyncBind[T any] struct { + libuv.Async + result T + chain func(T) } -func (ctx *AsyncContext) Complete() { - ctx.complete() +func asyncCb[T any](a *libuv.Async) { + a.Close(nil) + aa := (*asyncBind[T])(unsafe.Pointer(a)) + aa.chain(aa.result) } -func Async[T any](fn func(resolve func(T))) IO[T] { - return func(ctx *AsyncContext) Future[T] { - var result T - var done bool - fn(func(t T) { - result = t - done = true - ctx.Complete() +func Async[T any](fn func(func(T))) Future[T] { + return func(chain func(T)) { + loop := Exec().L + // var result T + // var a *libuv.Async + // var cb libuv.AsyncCb + // a, cb = cbind.BindF[libuv.Async, libuv.AsyncCb](func() { + // a.Close(nil) + // chain(result) + // }) + // loop.Async(a, cb) + + aa := &asyncBind[T]{chain: chain} + loop.Async(&aa.Async, asyncCb[T]) + fn(func(v T) { + aa.result = v + aa.Send() }) - return func() T { - if !done { - panic("async.Async: Future accessed before completion") - } - return result - } } } diff --git a/x/async/async_go.go b/x/async/async_go.go index 77d0cada..cec40669 100644 --- a/x/async/async_go.go +++ b/x/async/async_go.go @@ -1,5 +1,5 @@ -//go:build !llgo -// +build !llgo +//go:build llgo11 +// +build llgo11 /* * Copyright (c) 2024 The GoPlus Authors (goplus.org). All rights reserved. diff --git a/x/async/async_llgo.go b/x/async/async_llgo.go index 5b63434a..1c1b0cb6 100644 --- a/x/async/async_llgo.go +++ b/x/async/async_llgo.go @@ -1,6 +1,3 @@ -//go:build llgo -// +build llgo - /* * Copyright (c) 2024 The GoPlus Authors (goplus.org). All rights reserved. * @@ -21,92 +18,39 @@ package async import ( "sync/atomic" - "unsafe" - - "github.com/goplus/llgo/c/libuv" ) -type bindAsync struct { - libuv.Async - cb func() -} - -func BindIO[T any](call IO[T], callback func(T)) { - loop := Exec().L - a := &bindAsync{} - loop.Async(&a.Async, func(p *libuv.Async) { - (*bindAsync)(unsafe.Pointer(p)).cb() - }) - done := atomic.Bool{} - ctx := &AsyncContext{ - Executor: Exec(), - complete: func() { - done.Store(true) - a.Async.Send() - }, - } - f := call(ctx) - called := false - a.cb = func() { - if called { - return - } - a.Async.Close(nil) - result := f() - callback(result) - } - // don't delay the callback if the future is already done - if done.Load() { - called = true - a.cb() - } -} - -func Await[T1 any](call IO[T1]) (ret T1) { - BindIO(call, func(v T1) { - ret = v - }) - return +func Await[T1 any](call Future[T1]) (ret T1) { + return Run(call) } // ----------------------------------------------------------------------------- -func Race[T1 any](calls ...IO[T1]) IO[T1] { +func Race[T1 any](futures ...Future[T1]) Future[T1] { return Async(func(resolve func(T1)) { - done := false - for _, call := range calls { - var f Future[T1] - f = call(&AsyncContext{ - Executor: Exec(), - complete: func() { - if done { - return - } - done = true - resolve(f()) - }, + done := atomic.Bool{} + for _, future := range futures { + future(func(v T1) { + if !done.Swap(true) { + resolve(v) + } }) } }) } -func All[T1 any](calls ...IO[T1]) IO[[]T1] { +func All[T1 any](futures ...Future[T1]) Future[[]T1] { return Async(func(resolve func([]T1)) { - n := len(calls) + n := len(futures) results := make([]T1, n) - done := 0 - for i, call := range calls { + var done uint32 + for i, future := range futures { i := i - var f Future[T1] - f = call(&AsyncContext{ - Executor: Exec(), - complete: func() { - results[i] = f() - done++ - if done == n { - resolve(results) - } - }, + future(func(v T1) { + results[i] = v + if atomic.AddUint32(&done, 1) == uint32(n) { + resolve(results) + } }) } }) diff --git a/x/async/executor_go.go b/x/async/executor_go.go index 95151eeb..8ef70dc3 100644 --- a/x/async/executor_go.go +++ b/x/async/executor_go.go @@ -1,5 +1,5 @@ -//go:build !llgo -// +build !llgo +//go:build llgo11 +// +build llgo11 /* * Copyright (c) 2024 The GoPlus Authors (goplus.org). All rights reserved. diff --git a/x/async/executor_llgo.go b/x/async/executor_llgo.go index 10611185..3018cc3a 100644 --- a/x/async/executor_llgo.go +++ b/x/async/executor_llgo.go @@ -1,6 +1,3 @@ -//go:build llgo -// +build llgo - /* * Copyright (c) 2024 The GoPlus Authors (goplus.org). All rights reserved. * @@ -22,10 +19,14 @@ package async import ( "unsafe" + "github.com/goplus/llgo/c" "github.com/goplus/llgo/c/libuv" "github.com/goplus/llgo/c/pthread" ) +//go:linkname Gettid C.pthread_self +func Gettid() c.Pointer + var execKey pthread.Key func init() { @@ -44,20 +45,25 @@ func Exec() *Executor { return (*Executor)(v) } -func setExec(e *Executor) { +func setExec(e *Executor) (old *Executor) { + old = (*Executor)(execKey.Get()) execKey.Set(unsafe.Pointer(e)) + return } func (e *Executor) Run() { e.L.Run(libuv.RUN_DEFAULT) } -func Run(fn func()) { +func Run[T any](future Future[T]) (ret T) { loop := libuv.LoopNew() exec := &Executor{loop} - setExec(exec) - fn() + oldExec := setExec(exec) + future(func(v T) { + ret = v + }) exec.Run() loop.Close() - setExec(nil) + setExec(oldExec) + return } diff --git a/x/async/timeout/timeout_go.go b/x/async/timeout/timeout_go.go index b58121f2..2315c9a3 100644 --- a/x/async/timeout/timeout_go.go +++ b/x/async/timeout/timeout_go.go @@ -1,5 +1,5 @@ -//go:build !llgo -// +build !llgo +//go:build llgo11 +// +build llgo11 /* * Copyright (c) 2024 The GoPlus Authors (goplus.org). All rights reserved. diff --git a/x/async/timeout/timeout_llgo.go b/x/async/timeout/timeout_llgo.go index 9532c413..0bc07ab3 100644 --- a/x/async/timeout/timeout_llgo.go +++ b/x/async/timeout/timeout_llgo.go @@ -1,6 +1,3 @@ -//go:build llgo -// +build llgo - /* * Copyright (c) 2024 The GoPlus Authors (goplus.org). All rights reserved. * @@ -27,16 +24,16 @@ import ( "github.com/goplus/llgo/x/cbind" ) -func Timeout(d time.Duration) async.IO[async.Void] { +func Timeout(d time.Duration) async.Future[async.Void] { return async.Async(func(resolve func(async.Void)) { - t, _ := cbind.BindF[libuv.Timer, libuv.TimerCb](func() { + t, cb := cbind.BindF[libuv.Timer, libuv.TimerCb](func(t *libuv.Timer) { resolve(async.Void{}) }) r := libuv.InitTimer(async.Exec().L, t) if r != 0 { panic("InitTimer failed") } - r = t.Start(cbind.Callback[libuv.Timer], uint64(d/time.Millisecond), 0) + r = t.Start(cb, uint64(d/time.Millisecond), 0) if r != 0 { panic("Start failed") } diff --git a/x/cbind/buf.go b/x/cbind/buf.go index 3e1f2e67..b142d6d3 100644 --- a/x/cbind/buf.go +++ b/x/cbind/buf.go @@ -10,3 +10,7 @@ type slice struct { func GoBytes(buf *int8, n int) []byte { return *(*[]byte)(unsafe.Pointer(&slice{unsafe.Pointer(buf), n})) } + +func CBuffer(data []byte) (*int8, int) { + return (*int8)(unsafe.Pointer(&data[0])), len(data) +} diff --git a/x/cbind/cbind.go b/x/cbind/cbind.go index 47b68d2f..7f267428 100644 --- a/x/cbind/cbind.go +++ b/x/cbind/cbind.go @@ -52,24 +52,24 @@ type bind3[Base any, A any, B any, C any] struct { fn func(A, B, C) } -func Callback[Base any](b *Base) { - bind := (*bind[Base])(unsafe.Pointer(b)) +func Callback[Base any](base *Base) { + bind := (*bind[Base])(unsafe.Pointer(base)) bind.fn() } -func Callback1[Base any, A any](b *Base, a A) { - bind := (*bind1[Base, A])(unsafe.Pointer(b)) +func Callback1[Base any, A any](base *Base, a A) { + bind := (*bind1[Base, A])(unsafe.Pointer(base)) bind.fn(a) } -func Callback2[Base any, A any, B any](b *Base, a A, c B) { - bind := (*bind2[Base, A, B])(unsafe.Pointer(b)) - bind.fn(a, c) +func Callback2[Base any, A any, B any](base *Base, a A, b B) { + bind := (*bind2[Base, A, B])(unsafe.Pointer(base)) + bind.fn(a, b) } -func Callback3[Base any, A any, B any, C any](b *Base, a A, c B, d C) { - bind := (*bind3[Base, A, B, C])(unsafe.Pointer(b)) - bind.fn(a, c, d) +func Callback3[Base any, A any, B any, C any](base *Base, a A, b B, c C) { + bind := (*bind3[Base, A, B, C])(unsafe.Pointer(base)) + bind.fn(a, b, c) } /** @@ -107,8 +107,8 @@ func Bind[T any](call func()) (p *T, cb Cb[T]) { func BindF[T any, F ~func(*T)](call func()) (*T, F) { bb := &bind[T]{fn: call} p := (*T)(unsafe.Pointer(bb)) - fn := Callback[T] - return p, *(*F)(unsafe.Pointer(&fn)) + var fn F = Callback[T] + return p, fn } func Bind1[T any, A any](call func(A)) (p *T, cb Cb1[T, A]) { @@ -118,11 +118,11 @@ func Bind1[T any, A any](call func(A)) (p *T, cb Cb1[T, A]) { return } -func Bind1F[T any, A any, F ~func(A)](call func(A)) (*T, F) { +func Bind1F[T any, F ~func(*T, A), A any](call func(A)) (*T, F) { bb := &bind1[T, A]{fn: call} p := (*T)(unsafe.Pointer(bb)) - fn := Callback1[T, A] - return p, *(*F)(unsafe.Pointer(&fn)) + var fn F = Callback1[T, A] + return p, fn } func Bind2[T any, A any, B any](call func(A, B)) (p *T, cb Cb2[T, A, B]) { @@ -132,11 +132,11 @@ func Bind2[T any, A any, B any](call func(A, B)) (p *T, cb Cb2[T, A, B]) { return } -func Bind2F[T any, A any, B any, F ~func(A, B)](call func(A, B)) (*T, F) { +func Bind2F[T any, F ~func(*T, A, B), A any, B any](call func(A, B)) (*T, F) { bb := &bind2[T, A, B]{fn: call} p := (*T)(unsafe.Pointer(bb)) - fn := Callback2[T, A, B] - return p, *(*F)(unsafe.Pointer(&fn)) + var fn F = Callback2[T, A, B] + return p, fn } func Bind3[T any, A any, B any, C any](call func(A, B, C), a A, b B, c C) (p *T, cb Cb3[T, A, B, C]) { @@ -146,9 +146,9 @@ func Bind3[T any, A any, B any, C any](call func(A, B, C), a A, b B, c C) (p *T, return } -func Bind3F[T any, A any, B any, C any, F ~func(A, B, C)](call func(A, B, C), a A, b B, c C) (*T, F) { +func Bind3F[T any, F ~func(*T, A, B, C), A any, B any, C any](call func(A, B, C), a A, b B, c C) (*T, F) { bb := &bind3[T, A, B, C]{fn: call} p := (*T)(unsafe.Pointer(bb)) - fn := Callback3[T, A, B, C] - return p, *(*F)(unsafe.Pointer(&fn)) + var fn F = Callback3[T, A, B, C] + return p, fn } diff --git a/x/io/README.md b/x/io/README.md index 9681b3ab..da6a4c8e 100644 --- a/x/io/README.md +++ b/x/io/README.md @@ -367,98 +367,23 @@ In some situations, you may want to get the first result of multiple async opera ## Design -Introduce `Promise` type to represent an asynchronous operation and its resulting value. `Promise` can be resolved with a value with an error. `Promise` can be awaited to get the value and error. - -`Promise` just a type indicating the asynchronous operation, it can't be created and assigned directly. It be replaced to `PromiseImpl` by the LLGo compiler. +Introduce `async.IO[T]` type to represent an asynchronous operation, `async.Future[T]` type to represent the result of an asynchronous operation. `async.IO[T]` can be `bind` to a function that accepts `T` as an argument to chain multiple asynchronous operations. `async.IO[T]` can be `await` to get the value of the asynchronous operation. ```go -// Some native async functions -func timeoutAsync(d time.Duration, cb func()) { - go func() { - time.Sleep(d) - cb() - }() -} +package async -// Wrap callback-based async function into Promise -func resolveAfter1Second() (resolve Promise[string]) { - timeoutAsync(1 * time.Second, func() { - resolve("Resolved after 1 second", nil) - }) -} +type Future[T any] func() T +type IO[T any] func() Future[T] -// Compiled to: -func resolveAfter1Second() (resolve PromiseImpl[string]) { - promise := io.NewPromiseImpl[string](resolve func(value string, err error) { - resolve: func(value string, err error) { - for true { - switch (promise.prev = promise.next) { - case 0: - timeoutAsync(1 * time.Second, func() { - resolve("Resolved after 1 second", nil) - }) - } - } - }, - } - return promise -} - -func asyncCall() (resolve Promise[string]) { - str, err := resolveAfter1Second().Await() - resolve("AsyncCall: " + str, err) -} - -// Compiled to: -func asyncCall() (resolve PromiseImpl[string]) { - promise := io.NewPromiseImpl[string](resolve func(value string, err error) { - for true { - switch (promise.prev = promise.next) { - case 0: - resolveAfter1Second() - return - case 1: - str, err := promise.value, promise.err - resolve("AsyncCall: " + str, err) - return - } +func main() { + io := func() Future[string] { + return func() string { + return "Hello, World!" } - }) - return promise -} + } -// Directly return Promise -func asyncCall2() Promise[string] { - return resolveAfter1Second() -} - -// Compiled to: -func asyncCall2() PromiseImpl[string] { - return resolveAfter1Second() -} - -// Don't wait for Promise to complete -func asyncCall3() { - resolveAfter1Second().Then(func(result string) { - fmt.Println("AsyncCall3: " + result) - }) -} - -func asyncMain() { - fmt.Println("Starting AsyncCall") - result1 := asyncCall().Await() - fmt.Println(result1) - - fmt.Println("Starting AsyncCall2") - result2 := asyncCall2().Await() - fmt.Println(result2) - - fmt.Println("Starting AsyncCall3") - asyncCall3() - - // Wait for AsyncCall3 to complete - time.Sleep(2 * time.Second) - - fmt.Println("Main function completed") + future := io() + value := future() + println(value) } ``` diff --git a/x/io/io.go b/x/io/io.go deleted file mode 100644 index bc866b68..00000000 --- a/x/io/io.go +++ /dev/null @@ -1,127 +0,0 @@ -//go:build llgo -// +build llgo - -/* - * Copyright (c) 2024 The GoPlus Authors (goplus.org). All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io - -import ( - "unsafe" - _ "unsafe" - - "github.com/goplus/llgo/c" - "github.com/goplus/llgo/c/libuv" - "github.com/goplus/llgo/c/net" - "github.com/goplus/llgo/x/async" - "github.com/goplus/llgo/x/cbind" - "github.com/goplus/llgo/x/tuple" -) - -type Tcp struct { - tcp libuv.Tcp -} - -type libuvError libuv.Errno - -func (e libuvError) Error() string { - s := libuv.Strerror(libuv.Errno(e)) - return c.GoString(s, c.Strlen(s)) -} - -func NewTcp() *Tcp { - t := &Tcp{} - libuv.InitTcp(async.Exec().L, &t.tcp) - return t -} - -func (t *Tcp) Bind(addr *net.SockAddr, flags uint) error { - if res := t.tcp.Bind(addr, c.Uint(flags)); res != 0 { - return libuvError(res) - } - return nil -} - -func (t *Tcp) Listen(backlog int, cb func(server *libuv.Stream, status c.Int)) error { - if res := (*libuv.Stream)(&t.tcp).Listen(c.Int(backlog), cb); res != 0 { - return libuvError(res) - } - return nil -} - -func (t *Tcp) Accept() (client *Tcp, err error) { - tcp := &Tcp{} - if res := libuv.InitTcp(async.Exec().L, &tcp.tcp); res != 0 { - return nil, libuvError(res) - } - if res := (*libuv.Stream)(&t.tcp).Accept((*libuv.Stream)(&tcp.tcp)); res != 0 { - return nil, libuvError(res) - } - return tcp, nil -} - -func Connect(addr *net.SockAddr) async.IO[tuple.Tuple2[*Tcp, error]] { - return async.Async(func(resolve func(tuple.Tuple2[*Tcp, error])) { - tcp := &Tcp{} - if res := libuv.InitTcp(async.Exec().L, &tcp.tcp); res != 0 { - resolve(tuple.T2[*Tcp, error]((*Tcp)(nil), libuvError(res))) - return - } - req, _ := cbind.Bind1[libuv.Connect](func(status c.Int) { - if status != 0 { - resolve(tuple.T2[*Tcp, error]((*Tcp)(nil), libuvError(libuv.Errno(status)))) - } - }) - if res := libuv.TcpConnect(req, &tcp.tcp, addr, cbind.Callback1[libuv.Connect, c.Int]); res != 0 { - resolve(tuple.T2[*Tcp, error]((*Tcp)(nil), libuvError(res))) - } - resolve(tuple.T2[*Tcp, error](tcp, nil)) - }) -} - -func allocBuffer(handle *libuv.Handle, suggestedSize uintptr, buf *libuv.Buf) { - buf.Base = (*c.Char)(c.Malloc(suggestedSize)) - buf.Len = suggestedSize -} - -func (t *Tcp) Read() async.IO[tuple.Tuple2[[]byte, error]] { - return func(ctx *async.AsyncContext) async.Future[tuple.Tuple2[[]byte, error]] { - var result tuple.Tuple2[[]byte, error] - var done bool - tcp := (*libuv.Stream)(&t.tcp) - tcp.StartRead(allocBuffer, func(client *libuv.Stream, nread c.Long, buf *libuv.Buf) { - if nread > 0 { - result = tuple.T2[[]byte, error](cbind.GoBytes(buf.Base, int(nread)), nil) - } else if nread < 0 { - result = tuple.T2[[]byte, error](nil, libuvError(libuv.Errno(nread))) - } else { - result = tuple.T2[[]byte, error](nil, nil) - } - done = true - ctx.Complete() - }) - return func() tuple.Tuple2[[]byte, error] { - if !done { - panic("Tcp.Read: Future accessed before completion") - } - return result - } - } -} - -func (t *Tcp) Close() { - (*libuv.Handle)(unsafe.Pointer(&t.tcp)).Close(nil) -} diff --git a/x/io/io_llgo.go b/x/io/io_llgo.go new file mode 100644 index 00000000..a598bea8 --- /dev/null +++ b/x/io/io_llgo.go @@ -0,0 +1,291 @@ +/* + * Copyright (c) 2024 The GoPlus Authors (goplus.org). All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io + +import ( + "strings" + "syscall" + "unsafe" + _ "unsafe" + + "github.com/goplus/llgo/c" + "github.com/goplus/llgo/c/libuv" + "github.com/goplus/llgo/c/net" + "github.com/goplus/llgo/x/async" + "github.com/goplus/llgo/x/cbind" + "github.com/goplus/llgo/x/tuple" +) + +type Tcp struct { + tcp libuv.Tcp + listenCb func(server *Tcp, err error) + readCb func([]byte, error) + writeCb func(int, error) +} + +type libuvError libuv.Errno + +func (e libuvError) Error() string { + s := libuv.Strerror(libuv.Errno(e)) + return c.GoString(s, c.Strlen(s)) +} + +type getAddrInfoBind struct { + libuv.GetAddrInfo + resolve func(tuple.Tuple2[*net.SockAddr, error]) +} + +func getAddrInfoCb(p *libuv.GetAddrInfo, status c.Int, addr *net.AddrInfo) { + bind := (*getAddrInfoBind)(unsafe.Pointer(p)) + if status != 0 { + bind.resolve(tuple.T2[*net.SockAddr, error](nil, libuvError(status))) + return + } + bind.resolve(tuple.T2[*net.SockAddr, error](addr.Addr, nil)) +} + +func ParseAddr(addr string) async.Future[tuple.Tuple2[*net.SockAddr, error]] { + return async.Async(func(resolve func(tuple.Tuple2[*net.SockAddr, error])) { + host := "127.0.0.1" + var port string + // split host and service by last colon + idx := strings.LastIndex(addr, ":") + if idx < 0 { + port = addr + } else { + host = addr[:idx] + port = addr[idx+1:] + } + + hints := &net.AddrInfo{ + Family: net.AF_INET, + SockType: net.SOCK_STREAM, + Protocol: syscall.IPPROTO_TCP, + Flags: 0, + } + + // TODO(lijie): closure problem, instead with a struct to hold the resolve function. + // req, cb := cbind.Bind2F[libuv.GetAddrInfo, libuv.GetaddrinfoCb](func(status c.Int, addr *net.AddrInfo) { + // if status != 0 { + // resolve(tuple.T2[*net.SockAddr, error](nil, libuvError(status))) + // return + // } + // resolve(tuple.T2[*net.SockAddr, error](addr.Addr, nil)) + // }) + // if res := libuv.Getaddrinfo(async.Exec().L, req, cb, c.AllocaCStr(host), c.AllocaCStr(port), hints); res != 0 { + // resolve(tuple.T2[*net.SockAddr, error](nil, libuvError(res))) + // return + // } + bind := &getAddrInfoBind{ + resolve: resolve, + } + if res := libuv.Getaddrinfo(async.Exec().L, &bind.GetAddrInfo, getAddrInfoCb, c.AllocaCStr(host), c.AllocaCStr(port), hints); res != 0 { + resolve(tuple.T2[*net.SockAddr, error](nil, libuvError(res))) + return + } + }) +} + +func Listen(protocol, bindAddr string) async.Future[tuple.Tuple2[*Tcp, error]] { + return async.Async(func(resolve func(tuple.Tuple2[*Tcp, error])) { + tcp, err := NewTcp() + if err != nil { + resolve(tuple.T2[*Tcp, error](nil, err)) + return + } + ParseAddr(bindAddr)(func(v tuple.Tuple2[*net.SockAddr, error]) { + addr, err := v.Get() + if err != nil { + resolve(tuple.T2[*Tcp, error](nil, err)) + return + } + if err := tcp.Bind(addr, 0); err != nil { + resolve(tuple.T2[*Tcp, error](nil, err)) + return + } + if err := tcp.Listen(128, func(server *Tcp, err error) { + resolve(tuple.T2[*Tcp, error](server, err)) + }); err != nil { + resolve(tuple.T2[*Tcp, error](nil, err)) + } + }) + }) +} + +func NewTcp() (*Tcp, error) { + t := &Tcp{} + if res := libuv.InitTcp(async.Exec().L, &t.tcp); res != 0 { + return nil, libuvError(res) + } + return t, nil +} + +func (t *Tcp) Bind(addr *net.SockAddr, flags uint) error { + if res := t.tcp.Bind(addr, c.Uint(flags)); res != 0 { + return libuvError(res) + } + return nil +} + +func (t *Tcp) Listen(backlog int, cb func(server *Tcp, err error)) error { + t.listenCb = cb + res := (*libuv.Stream)(&t.tcp).Listen(c.Int(backlog), func(s *libuv.Stream, status c.Int) { + server := (*Tcp)(unsafe.Pointer(s)) + if status != 0 { + server.listenCb(server, libuvError(libuv.Errno(status))) + } else { + server.listenCb(server, nil) + } + }) + if res != 0 { + return libuvError(res) + } + return nil +} + +func (t *Tcp) Accept() (client *Tcp, err error) { + tcp := &Tcp{} + if res := libuv.InitTcp(async.Exec().L, &tcp.tcp); res != 0 { + return nil, libuvError(res) + } + if res := (*libuv.Stream)(&t.tcp).Accept((*libuv.Stream)(&tcp.tcp)); res != 0 { + return nil, libuvError(res) + } + return tcp, nil +} + +type connectBind struct { + libuv.Connect + tcp *Tcp + resolve func(tuple.Tuple2[*Tcp, error]) +} + +func connectCb(p *libuv.Connect, status c.Int) { + bind := (*connectBind)(unsafe.Pointer(p)) + if status != 0 { + bind.resolve(tuple.T2[*Tcp, error](nil, libuvError(libuv.Errno(status)))) + } else { + bind.resolve(tuple.T2[*Tcp, error](bind.tcp, nil)) + } +} + +func Connect(addr *net.SockAddr) async.Future[tuple.Tuple2[*Tcp, error]] { + return async.Async(func(resolve func(tuple.Tuple2[*Tcp, error])) { + tcp := &Tcp{} + if res := libuv.InitTcp(async.Exec().L, &tcp.tcp); res != 0 { + resolve(tuple.T2[*Tcp, error]((*Tcp)(nil), libuvError(res))) + return + } + // req, _ := cbind.Bind1[libuv.Connect](func(status c.Int) { + // if status != 0 { + // resolve(tuple.T2[*Tcp, error]((*Tcp)(nil), libuvError(libuv.Errno(status)))) + // } else { + // resolve(tuple.T2[*Tcp, error](tcp, nil)) + // } + // }) + req := &connectBind{ + tcp: tcp, + resolve: resolve, + } + if res := libuv.TcpConnect(&req.Connect, &req.tcp.tcp, addr, connectCb); res != 0 { + resolve(tuple.T2[*Tcp, error]((*Tcp)(nil), libuvError(res))) + return + } + }) +} + +func allocBuffer(handle *libuv.Handle, suggestedSize uintptr, buf *libuv.Buf) { + buf.Base = (*c.Char)(c.Malloc(suggestedSize)) + buf.Len = suggestedSize +} + +func (t *Tcp) StartRead(fn func(data []byte, err error)) { + t.readCb = func(data []byte, err error) { + fn(data, err) + } + tcp := (*libuv.Stream)(&t.tcp) + res := tcp.StartRead(allocBuffer, func(client *libuv.Stream, nread c.Long, buf *libuv.Buf) { + tcp := (*Tcp)(unsafe.Pointer(client)) + if nread > 0 { + tcp.readCb(cbind.GoBytes(buf.Base, int(nread)), nil) + } else if nread < 0 { + tcp.readCb(nil, libuvError(libuv.Errno(nread))) + } else { + tcp.readCb(nil, nil) + } + }) + if res != 0 { + t.readCb(nil, libuvError(libuv.Errno(res))) + } +} + +func (t *Tcp) StopRead() error { + tcp := (*libuv.Stream)(&t.tcp) + if res := tcp.StopRead(); res != 0 { + return libuvError(libuv.Errno(res)) + } + return nil +} + +// Read once from the TCP connection. +func (t *Tcp) Read() async.Future[tuple.Tuple2[[]byte, error]] { + return async.Async(func(resolve func(tuple.Tuple2[[]byte, error])) { + t.StartRead(func(data []byte, err error) { + if err := t.StopRead(); err != nil { + panic(err) + } + resolve(tuple.T2[[]byte, error](data, err)) + }) + }) +} + +func (t *Tcp) Write(data []byte) async.Future[error] { + return async.Async(func(resolve func(error)) { + writer, _ := cbind.Bind1[libuv.Write](func(req *libuv.Write, status c.Int) { + var result error + if status != 0 { + result = libuvError(libuv.Errno(status)) + } + resolve(result) + }) + tcp := (*libuv.Stream)(&t.tcp) + buf, len := cbind.CBuffer(data) + bufs := &libuv.Buf{Base: buf, Len: uintptr(len)} + writer.Write(tcp, bufs, 1, cbind.Callback1[libuv.Write, c.Int]) + }) +} + +// Don't use this funciton, just for deubg closure problem. +func (t *Tcp) Write1(data []byte) async.Future[error] { + return async.Async(func(resolve func(e error)) { + writer, cb := cbind.Bind1F[libuv.Write, libuv.WriteCb](func(req *libuv.Write, status c.Int) { + if status != 0 { + resolve(libuvError(libuv.Errno(status))) + return + } + resolve(nil) + }) + tcp := (*libuv.Stream)(&t.tcp) + buf, len := cbind.CBuffer(data) + bufs := &libuv.Buf{Base: buf, Len: uintptr(len)} + writer.Write(tcp, bufs, 1, cb) + }) +} + +func (t *Tcp) Close() { + (*libuv.Handle)(unsafe.Pointer(&t.tcp)).Close(nil) +}