From fce06722826051ffb62a45f6fbb14436889046b8 Mon Sep 17 00:00:00 2001 From: Li Jie Date: Fri, 6 Sep 2024 22:29:42 +0800 Subject: [PATCH] make future IO working both on go and llgo --- x/async/_demo/all/all.go | 140 +++------ x/async/async.go | 37 +-- x/async/async_go.go | 74 ++--- x/async/async_llgo.go | 29 +- x/async/executor_go.go | 14 +- x/async/executor_llgo.go | 7 +- x/async/timeout/timeout_go.go | 6 +- x/async/timeout/timeout_llgo.go | 3 + x/cbind/cbind.go | 11 - x/io/_demo/asyncdemo/async.go | 290 ------------------ x/{io => socketio}/README.md | 0 x/socketio/socketio_go.go | 92 ++++++ .../io_llgo.go => socketio/socketio_llgo.go} | 191 +++++------- 13 files changed, 284 insertions(+), 610 deletions(-) delete mode 100644 x/io/_demo/asyncdemo/async.go rename x/{io => socketio}/README.md (100%) create mode 100644 x/socketio/socketio_go.go rename x/{io/io_llgo.go => socketio/socketio_llgo.go} (52%) diff --git a/x/async/_demo/all/all.go b/x/async/_demo/all/all.go index 56e0c4b6..0f78e648 100644 --- a/x/async/_demo/all/all.go +++ b/x/async/_demo/all/all.go @@ -5,18 +5,16 @@ import ( "os" "time" - "github.com/goplus/llgo/c" - "github.com/goplus/llgo/c/net" "github.com/goplus/llgo/x/async" "github.com/goplus/llgo/x/async/timeout" - "github.com/goplus/llgo/x/io" + "github.com/goplus/llgo/x/socketio" "github.com/goplus/llgo/x/tuple" ) func ReadFile(fileName string) async.Future[tuple.Tuple2[[]byte, error]] { return async.Async(func(resolve func(tuple.Tuple2[[]byte, error])) { go func() { - println(async.Gettid(), "read file", fileName) + println("read file", fileName) bytes, err := os.ReadFile(fileName) resolve(tuple.T2(bytes, err)) }() @@ -50,16 +48,17 @@ func main() { func RunIO() { println("RunIO with Await") + // Hide `resolve` in Go+ async.Run(async.Async(func(resolve func(async.Void)) { println("read file") defer resolve(async.Void{}) - content, err := async.Await(ReadFile("1.txt")).Get() + content, err := async.Await(ReadFile("all.go")).Get() if err != nil { fmt.Printf("read err: %v\n", err) return } fmt.Printf("read content: %s\n", content) - err = async.Await(WriteFile("2.txt", content)) + err = async.Await(WriteFile("2.out", content)) if err != nil { fmt.Printf("write err: %v\n", err) return @@ -71,7 +70,7 @@ func RunIO() { println("RunIO with BindIO") async.Run(async.Async(func(resolve func(async.Void)) { - ReadFile("1.txt")(func(v tuple.Tuple2[[]byte, error]) { + ReadFile("all.go")(func(v tuple.Tuple2[[]byte, error]) { content, err := v.Get() if err != nil { fmt.Printf("read err: %v\n", err) @@ -79,7 +78,7 @@ func RunIO() { return } fmt.Printf("read content: %s\n", content) - WriteFile("2.txt", content)(func(v error) { + WriteFile("2.out", content)(func(v error) { err = v if err != nil { fmt.Printf("write err: %v\n", err) @@ -173,129 +172,66 @@ func RunSocket() { println("RunClient") - RunClient()(func(async.Void) { - println("RunClient done") - resolve(async.Void{}) + timeout.Timeout(100 * time.Millisecond)(func(async.Void) { + RunClient()(func(async.Void) { + println("RunClient done") + resolve(async.Void{}) + }) }) })) } func RunClient() async.Future[async.Void] { return async.Async(func(resolve func(async.Void)) { - bindAddr := "127.0.0.1:3927" - io.ParseAddr(bindAddr)(func(v tuple.Tuple2[*net.SockAddr, error]) { - addr, err := v.Get() - println("Connect to", addr, err) + addr := "127.0.0.1:3927" + socketio.Connect("tcp", addr)(func(v tuple.Tuple2[*socketio.Conn, error]) { + client, err := v.Get() + println("Connected", client, err) if err != nil { panic(err) } - io.Connect(addr)(func(v tuple.Tuple2[*io.Tcp, error]) { - client, err := v.Get() - println("Connected", client, err) - if err != nil { - panic(err) - } - var loop func(client *io.Tcp) - loop = func(client *io.Tcp) { - client.Write([]byte("Hello"))(func(err error) { + counter := 0 + var loop func(client *socketio.Conn) + loop = func(client *socketio.Conn) { + counter++ + data := fmt.Sprintf("Hello %d", counter) + client.Write([]byte(data))(func(err error) { + if err != nil { + panic(err) + } + client.Read()(func(v tuple.Tuple2[[]byte, error]) { + data, err := v.Get() if err != nil { panic(err) } - client.Read()(func(v tuple.Tuple2[[]byte, error]) { - data, err := v.Get() - if err != nil { - panic(err) - } - println("Read:", string(data)) - timeout.Timeout(1 * time.Second)(func(async.Void) { - loop(client) - }) + println("Read from server:", string(data)) + timeout.Timeout(1 * time.Second)(func(async.Void) { + loop(client) }) }) - } - loop(client) - }) + }) + } + loop(client) }) }) } func RunServer() async.Future[async.Void] { return async.Async(func(resolve func(async.Void)) { - server, err := io.NewTcp() - if err != nil { - panic(err) - } - - bindAddr := "0.0.0.0:3927" - io.ParseAddr(bindAddr)(func(v tuple.Tuple2[*net.SockAddr, error]) { - addr, err := v.Get() - if err != nil { - panic(err) - } - - if err = server.Bind(addr, 0); err != nil { - panic(err) - } - c.Printf(c.Str("Listening on %s\n"), c.AllocaCStr(bindAddr)) - - err = server.Listen(128, func(server *io.Tcp, err error) { - if err != nil { - panic(err) - } - client, err := server.Accept() - println("Accept", client, err) - - var loop func(client *io.Tcp) - loop = func(client *io.Tcp) { - client.Read()(func(v tuple.Tuple2[[]byte, error]) { - data, err := v.Get() - if err != nil { - println("Read error", err) - } else { - println("Read:", string(data)) - client.Write(data)(func(err error) { - if err != nil { - println("Write error", err) - } else { - println("Write done") - loop(client) - } - }) - } - }) - } - loop(client) - }) - if err != nil { - panic(err) - } - }) - }) -} - -func RunServer1() async.Future[async.Void] { - return async.Async(func(resolve func(async.Void)) { - io.Listen("tcp", "0.0.0.0:3927")(func(v tuple.Tuple2[*io.Tcp, error]) { - server, err := v.Get() - if err != nil { - panic(err) - } - client, err := server.Accept() - println("Accept", client, err) - - var loop func(client *io.Tcp) - loop = func(client *io.Tcp) { + socketio.Listen("tcp", "0.0.0.0:3927", func(client *socketio.Conn, err error) { + println("Client connected", client, err) + var loop func(client *socketio.Conn) + loop = func(client *socketio.Conn) { client.Read()(func(v tuple.Tuple2[[]byte, error]) { data, err := v.Get() if err != nil { println("Read error", err) } else { - println("Read:", string(data)) + println("Read from client:", string(data)) client.Write(data)(func(err error) { if err != nil { println("Write error", err) } else { - println("Write done") loop(client) } }) diff --git a/x/async/async.go b/x/async/async.go index b1d4f220..268bce60 100644 --- a/x/async/async.go +++ b/x/async/async.go @@ -17,45 +17,14 @@ package async import ( - "unsafe" _ "unsafe" - - "github.com/goplus/llgo/c/libuv" ) type Void = [0]byte type Future[T any] func(func(T)) -type asyncBind[T any] struct { - libuv.Async - result T - chain func(T) -} - -func asyncCb[T any](a *libuv.Async) { - a.Close(nil) - aa := (*asyncBind[T])(unsafe.Pointer(a)) - aa.chain(aa.result) -} - -func Async[T any](fn func(func(T))) Future[T] { - return func(chain func(T)) { - loop := Exec().L - // var result T - // var a *libuv.Async - // var cb libuv.AsyncCb - // a, cb = cbind.BindF[libuv.Async, libuv.AsyncCb](func() { - // a.Close(nil) - // chain(result) - // }) - // loop.Async(a, cb) - - aa := &asyncBind[T]{chain: chain} - loop.Async(&aa.Async, asyncCb[T]) - fn(func(v T) { - aa.result = v - aa.Send() - }) - } +// Just for pure LLGo/Go, transpile to callback in Go+ +func Await[T1 any](call Future[T1]) (ret T1) { + return Run(call) } diff --git a/x/async/async_go.go b/x/async/async_go.go index cec40669..c775b7e8 100644 --- a/x/async/async_go.go +++ b/x/async/async_go.go @@ -1,5 +1,5 @@ -//go:build llgo11 -// +build llgo11 +//go:build !llgo +// +build !llgo /* * Copyright (c) 2024 The GoPlus Authors (goplus.org). All rights reserved. @@ -21,71 +21,47 @@ package async import "sync" -func BindIO[T any](call IO[T], callback func(T)) { - callback(Await(call)) -} - -func Await[T1 any](call IO[T1]) (ret T1) { - ch := make(chan struct{}) - f := call(&AsyncContext{ - Executor: Exec(), - complete: func() { - close(ch) - }, - }) - <-ch - return f() +func Async[T any](fn func(func(T))) Future[T] { + return func(chain func(T)) { + go fn(chain) + } } // ----------------------------------------------------------------------------- -func Race[T1 any](calls ...IO[T1]) IO[T1] { +func Race[T1 any](futures ...Future[T1]) Future[T1] { return Async(func(resolve func(T1)) { - ch := make(chan int, len(calls)) - futures := make([]Future[T1], len(calls)) - for i, call := range calls { - i := i - call := call - go func() { - f := call(&AsyncContext{ - Executor: Exec(), - complete: func() { - defer func() { - _ = recover() - }() - ch <- i - }, - }) - futures[i] = f - }() + ch := make(chan T1) + for _, future := range futures { + future := future + future(func(v T1) { + defer func() { + // Avoid panic when the channel is closed. + _ = recover() + }() + ch <- v + }) } - i := <-ch + v := <-ch close(ch) - resolve(futures[i]()) + resolve(v) }) } -func All[T1 any](calls ...IO[T1]) IO[[]T1] { +func All[T1 any](futures ...Future[T1]) Future[[]T1] { return Async(func(resolve func([]T1)) { - n := len(calls) + n := len(futures) results := make([]T1, n) - futures := make([]Future[T1], n) wg := sync.WaitGroup{} wg.Add(n) - for i, call := range calls { + for i, future := range futures { i := i - f := call(&AsyncContext{ - Executor: Exec(), - complete: func() { - wg.Done() - }, + future(func(v T1) { + results[i] = v + wg.Done() }) - futures[i] = f } wg.Wait() - for i, f := range futures { - results[i] = f() - } resolve(results) }) } diff --git a/x/async/async_llgo.go b/x/async/async_llgo.go index 1c1b0cb6..d301b20c 100644 --- a/x/async/async_llgo.go +++ b/x/async/async_llgo.go @@ -1,3 +1,6 @@ +//go:build llgo +// +build llgo + /* * Copyright (c) 2024 The GoPlus Authors (goplus.org). All rights reserved. * @@ -18,10 +21,30 @@ package async import ( "sync/atomic" + + "github.com/goplus/llgo/c/libuv" + "github.com/goplus/llgo/x/cbind" ) -func Await[T1 any](call Future[T1]) (ret T1) { - return Run(call) +// Currently Async run chain a future that call chain in the goroutine running `async.Run`. +// TODO(lijie): It would better to switch when needed. +func Async[T any](fn func(func(T))) Future[T] { + return func(chain func(T)) { + loop := Exec().L + + var result T + var a *libuv.Async + var cb libuv.AsyncCb + a, cb = cbind.BindF[libuv.Async, libuv.AsyncCb](func(a *libuv.Async) { + a.Close(nil) + chain(result) + }) + loop.Async(a, cb) + fn(func(v T) { + result = v + a.Send() + }) + } } // ----------------------------------------------------------------------------- @@ -32,6 +55,7 @@ func Race[T1 any](futures ...Future[T1]) Future[T1] { for _, future := range futures { future(func(v T1) { if !done.Swap(true) { + // Just resolve the first one. resolve(v) } }) @@ -49,6 +73,7 @@ func All[T1 any](futures ...Future[T1]) Future[[]T1] { future(func(v T1) { results[i] = v if atomic.AddUint32(&done, 1) == uint32(n) { + // All done. resolve(results) } }) diff --git a/x/async/executor_go.go b/x/async/executor_go.go index 8ef70dc3..cd9fb891 100644 --- a/x/async/executor_go.go +++ b/x/async/executor_go.go @@ -1,5 +1,5 @@ -//go:build llgo11 -// +build llgo11 +//go:build !llgo +// +build !llgo /* * Copyright (c) 2024 The GoPlus Authors (goplus.org). All rights reserved. @@ -28,6 +28,12 @@ func Exec() *Executor { return exec } -func Run(fn func()) { - fn() +func Run[T any](future Future[T]) (ret T) { + ch := make(chan T) + go func() { + future(func(v T) { + ch <- v + }) + }() + return <-ch } diff --git a/x/async/executor_llgo.go b/x/async/executor_llgo.go index 3018cc3a..fd5e4e14 100644 --- a/x/async/executor_llgo.go +++ b/x/async/executor_llgo.go @@ -1,3 +1,6 @@ +//go:build llgo +// +build llgo + /* * Copyright (c) 2024 The GoPlus Authors (goplus.org). All rights reserved. * @@ -19,14 +22,10 @@ package async import ( "unsafe" - "github.com/goplus/llgo/c" "github.com/goplus/llgo/c/libuv" "github.com/goplus/llgo/c/pthread" ) -//go:linkname Gettid C.pthread_self -func Gettid() c.Pointer - var execKey pthread.Key func init() { diff --git a/x/async/timeout/timeout_go.go b/x/async/timeout/timeout_go.go index 2315c9a3..32839079 100644 --- a/x/async/timeout/timeout_go.go +++ b/x/async/timeout/timeout_go.go @@ -1,5 +1,5 @@ -//go:build llgo11 -// +build llgo11 +//go:build !llgo +// +build !llgo /* * Copyright (c) 2024 The GoPlus Authors (goplus.org). All rights reserved. @@ -25,7 +25,7 @@ import ( "github.com/goplus/llgo/x/async" ) -func Timeout(d time.Duration) async.IO[async.Void] { +func Timeout(d time.Duration) async.Future[async.Void] { return async.Async(func(resolve func(async.Void)) { go func() { time.Sleep(d) diff --git a/x/async/timeout/timeout_llgo.go b/x/async/timeout/timeout_llgo.go index 0bc07ab3..ed18ce26 100644 --- a/x/async/timeout/timeout_llgo.go +++ b/x/async/timeout/timeout_llgo.go @@ -1,3 +1,6 @@ +//go:build llgo +// +build llgo + /* * Copyright (c) 2024 The GoPlus Authors (goplus.org). All rights reserved. * diff --git a/x/cbind/cbind.go b/x/cbind/cbind.go index 562d5595..84b87528 100644 --- a/x/cbind/cbind.go +++ b/x/cbind/cbind.go @@ -83,17 +83,6 @@ func Callback3[Base any, A any, B any, C any](base *Base, a A, b B, c C) { * libuv.InitTimer(async.Exec().L, timer) * timer.Start(cb, 1000, 0) * - * TODO(lijie): fn isn't a C func-ptr, it's closure, should fix the LLGo compiler. - * See: https://github.com/goplus/llgo/issues/766 - * - * Workaround: - * - * timer, _ := cbind.Bind[libuv.Timer](func() { - * println("hello") - * }) - * libuv.InitTimer(async.Exec().L, timer) - * timer.Start(cbind.Callback[libuv.Timer], 1000, 0) - * * @param call The Go function to bind. * @return The data pointer and the C callback function. */ diff --git a/x/io/_demo/asyncdemo/async.go b/x/io/_demo/asyncdemo/async.go deleted file mode 100644 index 02b4475a..00000000 --- a/x/io/_demo/asyncdemo/async.go +++ /dev/null @@ -1,290 +0,0 @@ -package main - -import ( - "encoding/json" - "fmt" - "log" - "time" - - "github.com/goplus/llgo/x/io" -) - -// ----------------------------------------------------------------------------- - -type Response struct { - StatusCode int - - mockBody string -} - -func (r *Response) mock(body string) { - r.mockBody = body -} - -func (r *Response) Text() (resolve io.Promise[string]) { - resolve(r.mockBody, nil) - return -} - -func (r *Response) TextCompiled() *io.PromiseImpl[string] { - P := &io.PromiseImpl[string]{} - P.Func = func(resolve func(string, error)) { - for { - switch P.Prev = P.Next; P.Prev { - case 0: - resolve(r.mockBody, nil) - P.Next = -1 - return - default: - panic("Promise already done") - } - } - } - return P -} - -func HttpGet(url string, callback func(resp *Response, err error)) { - resp := &Response{StatusCode: 200} - callback(resp, nil) -} - -func AsyncHttpGet(url string) (resolve io.Promise[*Response]) { - HttpGet(url, resolve) - return -} - -func AsyncHttpGetCompiled(url string) *io.PromiseImpl[*Response] { - P := &io.PromiseImpl[*Response]{} - P.Func = func(resolve func(*Response, error)) { - for { - switch P.Prev = P.Next; P.Prev { - case 0: - HttpGet(url, resolve) - P.Next = -1 - return - default: - panic("Promise already done") - } - } - } - return P -} - -// ----------------------------------------------------------------------------- - -type User struct { - Name string -} - -func GetUser(uid string) (resolve io.Promise[User]) { - resp, err := AsyncHttpGet("http://example.com/user/" + uid).Await() - if err != nil { - resolve(User{}, err) - return - } - - if resp.StatusCode != 200 { - resolve(User{}, fmt.Errorf("http status code: %d", resp.StatusCode)) - return - } - - resp.mock(`{"name":"Alice"}`) - - body, err := resp.Text().Await() - if err != nil { - resolve(User{}, err) - return - } - user := User{} - if err := json.Unmarshal([]byte(body), &user); err != nil { - resolve(User{}, err) - return - } - - resolve(user, nil) - return -} - -func GetUserCompiled(uid string) *io.PromiseImpl[User] { - var state1 *io.PromiseImpl[*Response] - var state2 *io.PromiseImpl[string] - - P := &io.PromiseImpl[User]{} - P.Func = func(resolve func(User, error)) { - for { - switch P.Prev = P.Next; P.Prev { - case 0: - state1 = AsyncHttpGetCompiled("http://example.com/user/" + uid) - P.Next = 1 - return - case 1: - state1.EnsureDone() - resp, err := state1.Value, state1.Err - if err != nil { - resolve(User{}, err) - return - } - - if resp.StatusCode != 200 { - resolve(User{}, fmt.Errorf("http status code: %d", resp.StatusCode)) - return - } - - resp.mock(`{"name":"Alice"}`) - - state2 = resp.TextCompiled() - P.Next = 2 - return - case 2: - state2.EnsureDone() - body, err := state2.Value, state2.Err - if err != nil { - resolve(User{}, err) - return - } - user := User{} - if err := json.Unmarshal([]byte(body), &user); err != nil { - resolve(User{}, err) - return - } - - resolve(user, nil) - P.Next = -1 - return - default: - panic("Promise already done") - } - } - } - return P -} - -func GetScore() *io.Promise[float64] { - panic("todo: GetScore") -} - -func GetScoreCompiled() *io.PromiseImpl[float64] { - P := &io.PromiseImpl[float64]{} - P.Func = func(resolve func(float64, error)) { - for { - switch P.Prev = P.Next; P.Prev { - case 0: - panic("todo: GetScore") - default: - panic("Promise already done") - } - } - } - return P -} - -func DoUpdate(op string) *io.Promise[io.Void] { - panic("todo: DoUpdate") -} - -func DoUpdateCompiled(op string) *io.PromiseImpl[io.Void] { - P := &io.PromiseImpl[io.Void]{} - P.Func = func(resolve func(io.Void, error)) { - for { - switch P.Prev = P.Next; P.Prev { - case 0: - panic("todo: DoUpdate") - default: - panic("Promise already done") - } - } - } - return P -} - -func Demo() (resolve io.Promise[io.Void]) { - user, err := GetUser("123").Await() - log.Println(user, err) - - user, err = io.Race[User](GetUser("123"), GetUser("456"), GetUser("789")).Await() - log.Println(user, err) - - users, err := io.All[User]([]io.AsyncCall[User]{GetUser("123"), GetUser("456"), GetUser("789")}).Await() - log.Println(users, err) - - user, score, _, err := io.Await3[User, float64, io.Void](GetUser("123"), GetScore(), DoUpdate("update sth.")) - log.Println(user, score, err) - - // TODO(lijie): select from multiple promises without channel - select { - case user := <-GetUser("123").Chan(): - log.Println("user:", user) - case score := <-GetScore().Chan(): - log.Println("score:", score) - case <-io.Timeout(5 * time.Second).Chan(): - log.Println("timeout") - } - return -} - -func DemoCompiled() *io.PromiseImpl[io.Void] { - var state1 *io.PromiseImpl[User] - var state2 *io.PromiseImpl[User] - var state3 *io.PromiseImpl[[]User] - var state4 *io.PromiseImpl[io.Await3Result[User, float64, io.Void]] - - P := &io.PromiseImpl[io.Void]{} - P.Func = func(resolve func(io.Void, error)) { - for { - switch P.Prev = P.Next; P.Prev { - case 0: - state1 = GetUserCompiled("123") - P.Next = 1 - return - case 1: - state1.EnsureDone() - user, err := state1.Value, state1.Err - log.Printf("user: %v, err: %v\n", user, err) - - state2 = io.Race[User](GetUserCompiled("123"), GetUserCompiled("456"), GetUserCompiled("789")) - P.Next = 2 - return - case 2: - state2.EnsureDone() - user, err := state2.Value, state2.Err - log.Println(user, err) - - state3 = io.All[User]([]io.AsyncCall[User]{GetUserCompiled("123"), GetUserCompiled("456"), GetUserCompiled("789")}) - P.Next = 3 - return - case 3: - state3.EnsureDone() - users, err := state3.Value, state3.Err - log.Println(users, err) - - state4 = io.Await3Compiled[User, float64, io.Void](GetUserCompiled("123"), GetScoreCompiled(), DoUpdateCompiled("update sth.")) - P.Next = 4 - return - case 4: - state4.EnsureDone() - user, score, _, err := state4.Value.V1, state4.Value.V2, state4.Value.V3, state4.Value.Err - log.Println(user, score, err) - - select { - case user := <-GetUserCompiled("123").Chan(): - log.Println("user:", user) - case score := <-GetScoreCompiled().Chan(): - log.Println("score:", score) - case <-io.TimeoutCompiled(5 * time.Second).Chan(): - log.Println("timeout") - } - P.Next = -1 - return - default: - panic("Promise already done") - } - } - } - return P -} - -func main() { - log.SetFlags(log.Lshortfile | log.LstdFlags) - // io.Run(Demo()) - io.Run(DemoCompiled()) -} diff --git a/x/io/README.md b/x/socketio/README.md similarity index 100% rename from x/io/README.md rename to x/socketio/README.md diff --git a/x/socketio/socketio_go.go b/x/socketio/socketio_go.go new file mode 100644 index 00000000..9a32e433 --- /dev/null +++ b/x/socketio/socketio_go.go @@ -0,0 +1,92 @@ +//go:build !llgo +// +build !llgo + +/* + * Copyright (c) 2024 The GoPlus Authors (goplus.org). All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package socketio + +import ( + "net" + + "github.com/goplus/llgo/x/async" + "github.com/goplus/llgo/x/tuple" +) + +type Conn struct { + conn net.Conn +} + +func Listen(protocol, bindAddr string, listenCb func(client *Conn, err error)) { + go func() { + listener, err := net.Listen(protocol, bindAddr) + if err != nil { + listenCb(nil, err) + return + } + for { + conn, err := listener.Accept() + if err != nil { + listenCb(nil, err) + return + } + listenCb(&Conn{conn: conn}, nil) + } + }() +} + +func Connect(network, addr string) async.Future[tuple.Tuple2[*Conn, error]] { + return async.Async(func(resolve func(tuple.Tuple2[*Conn, error])) { + go func() { + conn, err := net.Dial(network, addr) + if err != nil { + resolve(tuple.T2[*Conn, error](nil, err)) + return + } + resolve(tuple.T2[*Conn, error](&Conn{conn: conn}, nil)) + }() + }) +} + +// Read once from the TCP connection. +func (t *Conn) Read() async.Future[tuple.Tuple2[[]byte, error]] { + return async.Async(func(resolve func(tuple.Tuple2[[]byte, error])) { + go func() { + buf := make([]byte, 1024) + n, err := t.conn.Read(buf) + if err != nil { + resolve(tuple.T2[[]byte, error](nil, err)) + return + } + resolve(tuple.T2[[]byte, error](buf[:n], nil)) + }() + }) +} + +func (t *Conn) Write(data []byte) async.Future[error] { + return async.Async(func(resolve func(error)) { + go func() { + _, err := t.conn.Write(data) + resolve(err) + }() + }) +} + +func (t *Conn) Close() { + if t.conn != nil { + t.conn.Close() + } +} diff --git a/x/io/io_llgo.go b/x/socketio/socketio_llgo.go similarity index 52% rename from x/io/io_llgo.go rename to x/socketio/socketio_llgo.go index a598bea8..cff5305d 100644 --- a/x/io/io_llgo.go +++ b/x/socketio/socketio_llgo.go @@ -1,3 +1,6 @@ +//go:build llgo +// +build llgo + /* * Copyright (c) 2024 The GoPlus Authors (goplus.org). All rights reserved. * @@ -14,7 +17,7 @@ * limitations under the License. */ -package io +package socketio import ( "strings" @@ -30,11 +33,14 @@ import ( "github.com/goplus/llgo/x/tuple" ) -type Tcp struct { +type Listener struct { tcp libuv.Tcp - listenCb func(server *Tcp, err error) - readCb func([]byte, error) - writeCb func(int, error) + listenCb func(server *Listener, err error) +} + +type Conn struct { + tcp libuv.Tcp + readCb func([]byte, error) } type libuvError libuv.Errno @@ -58,7 +64,7 @@ func getAddrInfoCb(p *libuv.GetAddrInfo, status c.Int, addr *net.AddrInfo) { bind.resolve(tuple.T2[*net.SockAddr, error](addr.Addr, nil)) } -func ParseAddr(addr string) async.Future[tuple.Tuple2[*net.SockAddr, error]] { +func parseAddr(addr string) async.Future[tuple.Tuple2[*net.SockAddr, error]] { return async.Async(func(resolve func(tuple.Tuple2[*net.SockAddr, error])) { host := "127.0.0.1" var port string @@ -78,73 +84,64 @@ func ParseAddr(addr string) async.Future[tuple.Tuple2[*net.SockAddr, error]] { Flags: 0, } - // TODO(lijie): closure problem, instead with a struct to hold the resolve function. - // req, cb := cbind.Bind2F[libuv.GetAddrInfo, libuv.GetaddrinfoCb](func(status c.Int, addr *net.AddrInfo) { - // if status != 0 { - // resolve(tuple.T2[*net.SockAddr, error](nil, libuvError(status))) - // return - // } - // resolve(tuple.T2[*net.SockAddr, error](addr.Addr, nil)) - // }) - // if res := libuv.Getaddrinfo(async.Exec().L, req, cb, c.AllocaCStr(host), c.AllocaCStr(port), hints); res != 0 { - // resolve(tuple.T2[*net.SockAddr, error](nil, libuvError(res))) - // return - // } - bind := &getAddrInfoBind{ - resolve: resolve, - } - if res := libuv.Getaddrinfo(async.Exec().L, &bind.GetAddrInfo, getAddrInfoCb, c.AllocaCStr(host), c.AllocaCStr(port), hints); res != 0 { + req, cb := cbind.Bind2F[libuv.GetAddrInfo, libuv.GetaddrinfoCb](func(i *libuv.GetAddrInfo, status c.Int, addr *net.AddrInfo) { + if status != 0 { + resolve(tuple.T2[*net.SockAddr, error](nil, libuvError(status))) + return + } + resolve(tuple.T2[*net.SockAddr, error](addr.Addr, nil)) + }) + if res := libuv.Getaddrinfo(async.Exec().L, req, cb, c.AllocaCStr(host), c.AllocaCStr(port), hints); res != 0 { resolve(tuple.T2[*net.SockAddr, error](nil, libuvError(res))) return } }) } -func Listen(protocol, bindAddr string) async.Future[tuple.Tuple2[*Tcp, error]] { - return async.Async(func(resolve func(tuple.Tuple2[*Tcp, error])) { - tcp, err := NewTcp() +func Listen(protocol, bindAddr string, listenCb func(client *Conn, err error)) { + tcp, err := newListener() + if err != nil { + listenCb(nil, err) + return + } + parseAddr(bindAddr)(func(v tuple.Tuple2[*net.SockAddr, error]) { + addr, err := v.Get() if err != nil { - resolve(tuple.T2[*Tcp, error](nil, err)) + listenCb(nil, err) return } - ParseAddr(bindAddr)(func(v tuple.Tuple2[*net.SockAddr, error]) { - addr, err := v.Get() - if err != nil { - resolve(tuple.T2[*Tcp, error](nil, err)) - return - } - if err := tcp.Bind(addr, 0); err != nil { - resolve(tuple.T2[*Tcp, error](nil, err)) - return - } - if err := tcp.Listen(128, func(server *Tcp, err error) { - resolve(tuple.T2[*Tcp, error](server, err)) - }); err != nil { - resolve(tuple.T2[*Tcp, error](nil, err)) - } - }) + if err := tcp.bind(addr, 0); err != nil { + listenCb(nil, err) + return + } + if err := tcp.listen(128, func(server *Listener, err error) { + client, err := server.accept() + listenCb(client, err) + }); err != nil { + listenCb(nil, err) + } }) } -func NewTcp() (*Tcp, error) { - t := &Tcp{} +func newListener() (*Listener, error) { + t := &Listener{} if res := libuv.InitTcp(async.Exec().L, &t.tcp); res != 0 { return nil, libuvError(res) } return t, nil } -func (t *Tcp) Bind(addr *net.SockAddr, flags uint) error { +func (t *Listener) bind(addr *net.SockAddr, flags uint) error { if res := t.tcp.Bind(addr, c.Uint(flags)); res != 0 { return libuvError(res) } return nil } -func (t *Tcp) Listen(backlog int, cb func(server *Tcp, err error)) error { - t.listenCb = cb - res := (*libuv.Stream)(&t.tcp).Listen(c.Int(backlog), func(s *libuv.Stream, status c.Int) { - server := (*Tcp)(unsafe.Pointer(s)) +func (l *Listener) listen(backlog int, cb func(server *Listener, err error)) error { + l.listenCb = cb + res := (*libuv.Stream)(&l.tcp).Listen(c.Int(backlog), func(s *libuv.Stream, status c.Int) { + server := (*Listener)(unsafe.Pointer(s)) if status != 0 { server.listenCb(server, libuvError(libuv.Errno(status))) } else { @@ -157,54 +154,43 @@ func (t *Tcp) Listen(backlog int, cb func(server *Tcp, err error)) error { return nil } -func (t *Tcp) Accept() (client *Tcp, err error) { - tcp := &Tcp{} +func (l *Listener) accept() (client *Conn, err error) { + tcp := &Conn{} if res := libuv.InitTcp(async.Exec().L, &tcp.tcp); res != 0 { return nil, libuvError(res) } - if res := (*libuv.Stream)(&t.tcp).Accept((*libuv.Stream)(&tcp.tcp)); res != 0 { + if res := (*libuv.Stream)(&l.tcp).Accept((*libuv.Stream)(&tcp.tcp)); res != 0 { return nil, libuvError(res) } return tcp, nil } -type connectBind struct { - libuv.Connect - tcp *Tcp - resolve func(tuple.Tuple2[*Tcp, error]) -} +func Connect(network, addr string) async.Future[tuple.Tuple2[*Conn, error]] { + return async.Async(func(resolve func(tuple.Tuple2[*Conn, error])) { + parseAddr(addr)(func(v tuple.Tuple2[*net.SockAddr, error]) { + addr, err := v.Get() + if err != nil { + resolve(tuple.T2[*Conn, error]((*Conn)(nil), err)) + return + } -func connectCb(p *libuv.Connect, status c.Int) { - bind := (*connectBind)(unsafe.Pointer(p)) - if status != 0 { - bind.resolve(tuple.T2[*Tcp, error](nil, libuvError(libuv.Errno(status)))) - } else { - bind.resolve(tuple.T2[*Tcp, error](bind.tcp, nil)) - } -} - -func Connect(addr *net.SockAddr) async.Future[tuple.Tuple2[*Tcp, error]] { - return async.Async(func(resolve func(tuple.Tuple2[*Tcp, error])) { - tcp := &Tcp{} - if res := libuv.InitTcp(async.Exec().L, &tcp.tcp); res != 0 { - resolve(tuple.T2[*Tcp, error]((*Tcp)(nil), libuvError(res))) - return - } - // req, _ := cbind.Bind1[libuv.Connect](func(status c.Int) { - // if status != 0 { - // resolve(tuple.T2[*Tcp, error]((*Tcp)(nil), libuvError(libuv.Errno(status)))) - // } else { - // resolve(tuple.T2[*Tcp, error](tcp, nil)) - // } - // }) - req := &connectBind{ - tcp: tcp, - resolve: resolve, - } - if res := libuv.TcpConnect(&req.Connect, &req.tcp.tcp, addr, connectCb); res != 0 { - resolve(tuple.T2[*Tcp, error]((*Tcp)(nil), libuvError(res))) - return - } + tcp := &Conn{} + if res := libuv.InitTcp(async.Exec().L, &tcp.tcp); res != 0 { + resolve(tuple.T2[*Conn, error]((*Conn)(nil), libuvError(res))) + return + } + req, cb := cbind.Bind1F[libuv.Connect, libuv.ConnectCb](func(c *libuv.Connect, status c.Int) { + if status != 0 { + resolve(tuple.T2[*Conn, error]((*Conn)(nil), libuvError(libuv.Errno(status)))) + } else { + resolve(tuple.T2[*Conn, error](tcp, nil)) + } + }) + if res := libuv.TcpConnect(req, &tcp.tcp, addr, cb); res != 0 { + resolve(tuple.T2[*Conn, error]((*Conn)(nil), libuvError(res))) + return + } + }) }) } @@ -213,13 +199,13 @@ func allocBuffer(handle *libuv.Handle, suggestedSize uintptr, buf *libuv.Buf) { buf.Len = suggestedSize } -func (t *Tcp) StartRead(fn func(data []byte, err error)) { +func (t *Conn) StartRead(fn func(data []byte, err error)) { t.readCb = func(data []byte, err error) { fn(data, err) } tcp := (*libuv.Stream)(&t.tcp) res := tcp.StartRead(allocBuffer, func(client *libuv.Stream, nread c.Long, buf *libuv.Buf) { - tcp := (*Tcp)(unsafe.Pointer(client)) + tcp := (*Conn)(unsafe.Pointer(client)) if nread > 0 { tcp.readCb(cbind.GoBytes(buf.Base, int(nread)), nil) } else if nread < 0 { @@ -233,7 +219,7 @@ func (t *Tcp) StartRead(fn func(data []byte, err error)) { } } -func (t *Tcp) StopRead() error { +func (t *Conn) StopRead() error { tcp := (*libuv.Stream)(&t.tcp) if res := tcp.StopRead(); res != 0 { return libuvError(libuv.Errno(res)) @@ -242,7 +228,7 @@ func (t *Tcp) StopRead() error { } // Read once from the TCP connection. -func (t *Tcp) Read() async.Future[tuple.Tuple2[[]byte, error]] { +func (t *Conn) Read() async.Future[tuple.Tuple2[[]byte, error]] { return async.Async(func(resolve func(tuple.Tuple2[[]byte, error])) { t.StartRead(func(data []byte, err error) { if err := t.StopRead(); err != nil { @@ -253,7 +239,7 @@ func (t *Tcp) Read() async.Future[tuple.Tuple2[[]byte, error]] { }) } -func (t *Tcp) Write(data []byte) async.Future[error] { +func (t *Conn) Write(data []byte) async.Future[error] { return async.Async(func(resolve func(error)) { writer, _ := cbind.Bind1[libuv.Write](func(req *libuv.Write, status c.Int) { var result error @@ -269,23 +255,6 @@ func (t *Tcp) Write(data []byte) async.Future[error] { }) } -// Don't use this funciton, just for deubg closure problem. -func (t *Tcp) Write1(data []byte) async.Future[error] { - return async.Async(func(resolve func(e error)) { - writer, cb := cbind.Bind1F[libuv.Write, libuv.WriteCb](func(req *libuv.Write, status c.Int) { - if status != 0 { - resolve(libuvError(libuv.Errno(status))) - return - } - resolve(nil) - }) - tcp := (*libuv.Stream)(&t.tcp) - buf, len := cbind.CBuffer(data) - bufs := &libuv.Buf{Base: buf, Len: uintptr(len)} - writer.Write(tcp, bufs, 1, cb) - }) -} - -func (t *Tcp) Close() { +func (t *Conn) Close() { (*libuv.Handle)(unsafe.Pointer(&t.tcp)).Close(nil) }