From 1a158b5de321c9ad0996dd88d27801a85a280750 Mon Sep 17 00:00:00 2001 From: Li Jie Date: Wed, 4 Sep 2024 17:08:08 +0800 Subject: [PATCH] async: work both go and llgo --- x/async/_demo/monad/monad.go | 41 +++++--- x/async/async.go | 103 ++------------------ x/async/async_go.go | 91 +++++++++++++++++ x/async/async_llgo.go | 113 ++++++++++++++++++++++ x/async/executor_go.go | 33 +++++++ x/async/{executor.go => executor_llgo.go} | 4 + x/async/timeout/timeout_go.go | 35 +++++++ x/async/timeout/timeout_llgo.go | 44 +++++++++ x/cbind/buf.go | 12 +++ x/cbind/cbind.go | 65 ++++++++++--- x/io/io.go | 51 +++++----- 11 files changed, 441 insertions(+), 151 deletions(-) create mode 100644 x/async/async_go.go create mode 100644 x/async/async_llgo.go create mode 100644 x/async/executor_go.go rename x/async/{executor.go => executor_llgo.go} (96%) create mode 100644 x/async/timeout/timeout_go.go create mode 100644 x/async/timeout/timeout_llgo.go create mode 100644 x/cbind/buf.go diff --git a/x/async/_demo/monad/monad.go b/x/async/_demo/monad/monad.go index 336e91ea..4e326932 100644 --- a/x/async/_demo/monad/monad.go +++ b/x/async/_demo/monad/monad.go @@ -5,7 +5,6 @@ import ( "os" "time" - "github.com/goplus/llgo/c" "github.com/goplus/llgo/x/async" "github.com/goplus/llgo/x/async/timeout" "github.com/goplus/llgo/x/tuple" @@ -31,10 +30,9 @@ func WriteFile(fileName string, content []byte) async.IO[error] { func sleep(i int, d time.Duration) async.IO[int] { return async.Async(func(resolve func(int)) { - go func() { - c.Usleep(c.Uint(d.Microseconds())) + async.BindIO(timeout.Timeout(d), func(async.Void) { resolve(i) - }() + }) }) } @@ -46,6 +44,8 @@ func main() { } func RunIO() { + println("RunIO with Await") + async.Run(func() { content, err := async.Await(ReadFile("1.txt")).Get() if err != nil { @@ -62,6 +62,7 @@ func RunIO() { }) // Translated to in Go+: + println("RunIO with BindIO") async.Run(func() { async.BindIO(ReadFile("1.txt"), func(v tuple.Tuple2[[]byte, error]) { @@ -84,30 +85,42 @@ func RunIO() { } func RunAllAndRace() { + ms100 := 100 * time.Millisecond + ms200 := 200 * time.Millisecond + ms300 := 300 * time.Millisecond + + println("Run All with Await") + async.Run(func() { - all := async.All(sleep(1, time.Second), sleep(2, time.Second*2), sleep(3, time.Second*3)) + all := async.All(sleep(1, ms200), sleep(2, ms100), sleep(3, ms300)) async.BindIO(all, func(v []int) { fmt.Printf("All: %v\n", v) }) }) + println("Run Race with Await") + async.Run(func() { - first := async.Race(sleep(1, time.Second), sleep(2, time.Second*2), sleep(3, time.Second*3)) + first := async.Race(sleep(1, ms200), sleep(2, ms100), sleep(3, ms300)) v := async.Await(first) fmt.Printf("Race: %v\n", v) }) // Translated to in Go+: + println("Run All with BindIO") + async.Run(func() { - all := async.All(sleep(1, time.Second), sleep(2, time.Second*2), sleep(3, time.Second*3)) + all := async.All(sleep(1, ms200), sleep(2, ms100), sleep(3, ms300)) async.BindIO(all, func(v []int) { fmt.Printf("All: %v\n", v) }) }) + println("Run Race with BindIO") + async.Run(func() { - first := async.Race(sleep(1, time.Second), sleep(2, time.Second*2), sleep(3, time.Second*3)) + first := async.Race(sleep(1, ms200), sleep(2, ms100), sleep(3, ms300)) async.BindIO(first, func(v int) { fmt.Printf("Race: %v\n", v) }) @@ -115,17 +128,21 @@ func RunAllAndRace() { } func RunTimeout() { + println("Run Timeout with Await") + async.Run(func() { - fmt.Printf("Start 1 second timeout\n") - async.Await(timeout.Timeout(1 * time.Second)) + fmt.Printf("Start 100 ms timeout\n") + async.Await(timeout.Timeout(100 * time.Millisecond)) fmt.Printf("timeout\n") }) // Translated to in Go+: + println("Run Timeout with BindIO") + async.Run(func() { - fmt.Printf("Start 1 second timeout\n") - async.BindIO(timeout.Timeout(1*time.Second), func(async.Void) { + fmt.Printf("Start 100 ms timeout\n") + async.BindIO(timeout.Timeout(100*time.Millisecond), func(async.Void) { fmt.Printf("timeout\n") }) }) diff --git a/x/async/async.go b/x/async/async.go index cdf1d43a..8f7f0d64 100644 --- a/x/async/async.go +++ b/x/async/async.go @@ -17,11 +17,7 @@ package async import ( - "context" - "unsafe" _ "unsafe" - - "github.com/goplus/llgo/c/libuv" ) type Void = [0]byte @@ -30,16 +26,13 @@ type Future[T any] func() T type IO[T any] func(e *AsyncContext) Future[T] -type Chain[T any] func(callback func(T)) - -func (f Future[T]) Do(callback func(T)) { - callback(f()) +type AsyncContext struct { + *Executor + complete func() } -type AsyncContext struct { - context.Context - *Executor - Complete func() +func (ctx *AsyncContext) Complete() { + ctx.complete() } func Async[T any](fn func(resolve func(T))) IO[T] { @@ -53,93 +46,9 @@ func Async[T any](fn func(resolve func(T))) IO[T] { }) return func() T { if !done { - panic("AsyncIO: Future accessed before completion") + panic("async.Async: Future accessed before completion") } return result } } } - -type bindAsync struct { - libuv.Async - cb func() -} - -func BindIO[T any](call IO[T], callback func(T)) { - loop := Exec().L - a := &bindAsync{} - loop.Async(&a.Async, func(p *libuv.Async) { - (*bindAsync)(unsafe.Pointer(p)).cb() - }) - ctx := &AsyncContext{ - Context: context.Background(), - Executor: Exec(), - Complete: func() { - a.Async.Send() - }, - } - f := call(ctx) - a.cb = func() { - a.Async.Close(nil) - result := f() - callback(result) - } -} - -// ----------------------------------------------------------------------------- - -func Await[T1 any](call IO[T1]) (ret T1) { - ch := make(chan struct{}) - f := call(&AsyncContext{ - Context: context.Background(), - Executor: Exec(), - Complete: func() { - close(ch) - }, - }) - <-ch - return f() -} - -func Race[T1 any](calls ...IO[T1]) IO[T1] { - return Async(func(resolve func(T1)) { - done := false - for _, call := range calls { - var f Future[T1] - f = call(&AsyncContext{ - Context: context.Background(), - Executor: Exec(), - Complete: func() { - if done { - return - } - done = true - resolve(f()) - }, - }) - } - }) -} - -func All[T1 any](calls ...IO[T1]) IO[[]T1] { - return Async(func(resolve func([]T1)) { - n := len(calls) - results := make([]T1, n) - done := 0 - for i, call := range calls { - i := i - var f Future[T1] - f = call(&AsyncContext{ - Context: context.Background(), - Executor: Exec(), - Complete: func() { - results[i] = f() - done++ - if done == n { - resolve(results) - } - }, - }) - } - }) -} diff --git a/x/async/async_go.go b/x/async/async_go.go new file mode 100644 index 00000000..77d0cada --- /dev/null +++ b/x/async/async_go.go @@ -0,0 +1,91 @@ +//go:build !llgo +// +build !llgo + +/* + * Copyright (c) 2024 The GoPlus Authors (goplus.org). All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package async + +import "sync" + +func BindIO[T any](call IO[T], callback func(T)) { + callback(Await(call)) +} + +func Await[T1 any](call IO[T1]) (ret T1) { + ch := make(chan struct{}) + f := call(&AsyncContext{ + Executor: Exec(), + complete: func() { + close(ch) + }, + }) + <-ch + return f() +} + +// ----------------------------------------------------------------------------- + +func Race[T1 any](calls ...IO[T1]) IO[T1] { + return Async(func(resolve func(T1)) { + ch := make(chan int, len(calls)) + futures := make([]Future[T1], len(calls)) + for i, call := range calls { + i := i + call := call + go func() { + f := call(&AsyncContext{ + Executor: Exec(), + complete: func() { + defer func() { + _ = recover() + }() + ch <- i + }, + }) + futures[i] = f + }() + } + i := <-ch + close(ch) + resolve(futures[i]()) + }) +} + +func All[T1 any](calls ...IO[T1]) IO[[]T1] { + return Async(func(resolve func([]T1)) { + n := len(calls) + results := make([]T1, n) + futures := make([]Future[T1], n) + wg := sync.WaitGroup{} + wg.Add(n) + for i, call := range calls { + i := i + f := call(&AsyncContext{ + Executor: Exec(), + complete: func() { + wg.Done() + }, + }) + futures[i] = f + } + wg.Wait() + for i, f := range futures { + results[i] = f() + } + resolve(results) + }) +} diff --git a/x/async/async_llgo.go b/x/async/async_llgo.go new file mode 100644 index 00000000..5b63434a --- /dev/null +++ b/x/async/async_llgo.go @@ -0,0 +1,113 @@ +//go:build llgo +// +build llgo + +/* + * Copyright (c) 2024 The GoPlus Authors (goplus.org). All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package async + +import ( + "sync/atomic" + "unsafe" + + "github.com/goplus/llgo/c/libuv" +) + +type bindAsync struct { + libuv.Async + cb func() +} + +func BindIO[T any](call IO[T], callback func(T)) { + loop := Exec().L + a := &bindAsync{} + loop.Async(&a.Async, func(p *libuv.Async) { + (*bindAsync)(unsafe.Pointer(p)).cb() + }) + done := atomic.Bool{} + ctx := &AsyncContext{ + Executor: Exec(), + complete: func() { + done.Store(true) + a.Async.Send() + }, + } + f := call(ctx) + called := false + a.cb = func() { + if called { + return + } + a.Async.Close(nil) + result := f() + callback(result) + } + // don't delay the callback if the future is already done + if done.Load() { + called = true + a.cb() + } +} + +func Await[T1 any](call IO[T1]) (ret T1) { + BindIO(call, func(v T1) { + ret = v + }) + return +} + +// ----------------------------------------------------------------------------- + +func Race[T1 any](calls ...IO[T1]) IO[T1] { + return Async(func(resolve func(T1)) { + done := false + for _, call := range calls { + var f Future[T1] + f = call(&AsyncContext{ + Executor: Exec(), + complete: func() { + if done { + return + } + done = true + resolve(f()) + }, + }) + } + }) +} + +func All[T1 any](calls ...IO[T1]) IO[[]T1] { + return Async(func(resolve func([]T1)) { + n := len(calls) + results := make([]T1, n) + done := 0 + for i, call := range calls { + i := i + var f Future[T1] + f = call(&AsyncContext{ + Executor: Exec(), + complete: func() { + results[i] = f() + done++ + if done == n { + resolve(results) + } + }, + }) + } + }) +} diff --git a/x/async/executor_go.go b/x/async/executor_go.go new file mode 100644 index 00000000..95151eeb --- /dev/null +++ b/x/async/executor_go.go @@ -0,0 +1,33 @@ +//go:build !llgo +// +build !llgo + +/* + * Copyright (c) 2024 The GoPlus Authors (goplus.org). All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package async + +var exec = &Executor{} + +type Executor struct { +} + +func Exec() *Executor { + return exec +} + +func Run(fn func()) { + fn() +} diff --git a/x/async/executor.go b/x/async/executor_llgo.go similarity index 96% rename from x/async/executor.go rename to x/async/executor_llgo.go index 3306b541..10611185 100644 --- a/x/async/executor.go +++ b/x/async/executor_llgo.go @@ -1,3 +1,6 @@ +//go:build llgo +// +build llgo + /* * Copyright (c) 2024 The GoPlus Authors (goplus.org). All rights reserved. * @@ -56,4 +59,5 @@ func Run(fn func()) { fn() exec.Run() loop.Close() + setExec(nil) } diff --git a/x/async/timeout/timeout_go.go b/x/async/timeout/timeout_go.go new file mode 100644 index 00000000..b58121f2 --- /dev/null +++ b/x/async/timeout/timeout_go.go @@ -0,0 +1,35 @@ +//go:build !llgo +// +build !llgo + +/* + * Copyright (c) 2024 The GoPlus Authors (goplus.org). All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package timeout + +import ( + "time" + + "github.com/goplus/llgo/x/async" +) + +func Timeout(d time.Duration) async.IO[async.Void] { + return async.Async(func(resolve func(async.Void)) { + go func() { + time.Sleep(d) + resolve(async.Void{}) + }() + }) +} diff --git a/x/async/timeout/timeout_llgo.go b/x/async/timeout/timeout_llgo.go new file mode 100644 index 00000000..42ed91e5 --- /dev/null +++ b/x/async/timeout/timeout_llgo.go @@ -0,0 +1,44 @@ +//go:build llgo +// +build llgo + +/* + * Copyright (c) 2024 The GoPlus Authors (goplus.org). All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package timeout + +import ( + "time" + + "github.com/goplus/llgo/c/libuv" + "github.com/goplus/llgo/x/async" + "github.com/goplus/llgo/x/cbind" +) + +func Timeout(d time.Duration) async.IO[async.Void] { + return async.Async(func(resolve func(async.Void)) { + t, _ := cbind.Bind[libuv.Timer](func() { + resolve(async.Void{}) + }) + r := libuv.InitTimer(async.Exec().L, t) + if r != 0 { + panic("InitTimer failed") + } + r = t.Start(cbind.Callback[libuv.Timer], uint64(d/time.Millisecond), 0) + if r != 0 { + panic("Start failed") + } + }) +} diff --git a/x/cbind/buf.go b/x/cbind/buf.go new file mode 100644 index 00000000..3e1f2e67 --- /dev/null +++ b/x/cbind/buf.go @@ -0,0 +1,12 @@ +package cbind + +import "unsafe" + +type slice struct { + data unsafe.Pointer + len int +} + +func GoBytes(buf *int8, n int) []byte { + return *(*[]byte)(unsafe.Pointer(&slice{unsafe.Pointer(buf), n})) +} diff --git a/x/cbind/cbind.go b/x/cbind/cbind.go index 60e6d24a..b34cb05a 100644 --- a/x/cbind/cbind.go +++ b/x/cbind/cbind.go @@ -16,7 +16,21 @@ package cbind -import "unsafe" +import ( + "unsafe" +) + +// llgo:type C +type Cb[T any] func(*T) + +// llgo:type C +type Cb1[T any, A any] func(*T, A) + +// llgo:type C +type Cb2[T any, A any, B any] func(*T, A, B) + +// llgo:type C +type Cb3[T any, A any, B any, C any] func(*T, A, B, C) type bind[Base any] struct { b Base @@ -38,50 +52,75 @@ type bind3[Base any, A any, B any, C any] struct { fn func(A, B, C) } -func callback[Base any](b *Base) { +func Callback[Base any](b *Base) { bind := (*bind[Base])(unsafe.Pointer(b)) bind.fn() } -func callback1[Base any, A any](b *Base, a A) { +func Callback1[Base any, A any](b *Base, a A) { bind := (*bind1[Base, A])(unsafe.Pointer(b)) bind.fn(a) } -func callback2[Base any, A any, B any](b *Base, a A, c B) { +func Callback2[Base any, A any, B any](b *Base, a A, c B) { bind := (*bind2[Base, A, B])(unsafe.Pointer(b)) bind.fn(a, c) } -func callback3[Base any, A any, B any, C any](b *Base, a A, c B, d C) { +func Callback3[Base any, A any, B any, C any](b *Base, a A, c B, d C) { bind := (*bind3[Base, A, B, C])(unsafe.Pointer(b)) bind.fn(a, c, d) } -func Bind[T any](call func()) (p *T, fn func(*T)) { +/** + * Bind[N] binds a Go function to a C callback function. + * + * Example: + * + * timer, cb := cbind.Bind[libuv.Timer](func() { + * println("hello") + * }) + * libuv.InitTimer(async.Exec().L, timer) + * timer.Start(cb, 1000, 0) + * + * TODO(lijie): fn isn't a C func-ptr, it's closure, should fix the LLGo compiler. + * See: https://github.com/goplus/llgo/issues/766 + * + * Workaround: + * + * timer, _ := cbind.Bind[libuv.Timer](func() { + * println("hello") + * }) + * libuv.InitTimer(async.Exec().L, timer) + * timer.Start(cbind.Callback[libuv.Timer], 1000, 0) + * + * @param call The Go function to bind. + * @return The data pointer and the C callback function. + */ +func Bind[T any](call func()) (p *T, fn Cb[T]) { bb := &bind[T]{fn: func() { call() }} p = (*T)(unsafe.Pointer(bb)) - fn = callback[T] + fn = Callback[T] return } -func Bind1[T any, A any](call func(A)) (p *T, fn func(*T, A)) { +func Bind1[T any, A any](call func(A)) (p *T, fn Cb1[T, A]) { bb := &bind1[T, A]{fn: func(a A) { call(a) }} p = (*T)(unsafe.Pointer(bb)) - fn = callback1[T, A] + fn = Callback1[T, A] return } -func Bind2[T any, A any, B any](call func(A, B)) (p *T, fn func(*T, A, B)) { +func Bind2[T any, A any, B any](call func(A, B)) (p *T, fn Cb2[T, A, B]) { bb := &bind2[T, A, B]{fn: func(a A, b B) { call(a, b) }} p = (*T)(unsafe.Pointer(bb)) - fn = callback2[T, A, B] + fn = Callback2[T, A, B] return } -func Bind3[T any, A any, B any, C any](call func(A, B, C), a A, b B, c C) (p *T, fn func(*T, A, B, C)) { +func Bind3[T any, A any, B any, C any](call func(A, B, C), a A, b B, c C) (p *T, fn Cb3[T, A, B, C]) { bb := &bind3[T, A, B, C]{fn: func(a A, b B, c C) { call(a, b, c) }} p = (*T)(unsafe.Pointer(bb)) - fn = callback3[T, A, B, C] + fn = Callback3[T, A, B, C] return } diff --git a/x/io/io.go b/x/io/io.go index 053da3bf..bc866b68 100644 --- a/x/io/io.go +++ b/x/io/io.go @@ -1,3 +1,6 @@ +//go:build llgo +// +build llgo + /* * Copyright (c) 2024 The GoPlus Authors (goplus.org). All rights reserved. * @@ -29,7 +32,7 @@ import ( ) type Tcp struct { - tcp *libuv.Tcp + tcp libuv.Tcp } type libuvError libuv.Errno @@ -40,8 +43,8 @@ func (e libuvError) Error() string { } func NewTcp() *Tcp { - t := &Tcp{&libuv.Tcp{}} - libuv.InitTcp(async.Exec().L, t.tcp) + t := &Tcp{} + libuv.InitTcp(async.Exec().L, &t.tcp) return t } @@ -52,41 +55,40 @@ func (t *Tcp) Bind(addr *net.SockAddr, flags uint) error { return nil } -func (t *Tcp) Listen(backlog int, cb libuv.ConnectionCb) error { - if res := (*libuv.Stream)(t.tcp).Listen(c.Int(backlog), cb); res != 0 { +func (t *Tcp) Listen(backlog int, cb func(server *libuv.Stream, status c.Int)) error { + if res := (*libuv.Stream)(&t.tcp).Listen(c.Int(backlog), cb); res != 0 { return libuvError(res) } return nil } func (t *Tcp) Accept() (client *Tcp, err error) { - tcp := &libuv.Tcp{} - if res := libuv.InitTcp(async.Exec().L, tcp); res != 0 { + tcp := &Tcp{} + if res := libuv.InitTcp(async.Exec().L, &tcp.tcp); res != 0 { return nil, libuvError(res) } - if res := (*libuv.Stream)(t.tcp).Accept((*libuv.Stream)(client.tcp)); res != 0 { + if res := (*libuv.Stream)(&t.tcp).Accept((*libuv.Stream)(&tcp.tcp)); res != 0 { return nil, libuvError(res) } - return &Tcp{tcp}, nil + return tcp, nil } func Connect(addr *net.SockAddr) async.IO[tuple.Tuple2[*Tcp, error]] { return async.Async(func(resolve func(tuple.Tuple2[*Tcp, error])) { - tcp := &libuv.Tcp{} - if res := libuv.InitTcp(async.Exec().L, tcp); res != 0 { + tcp := &Tcp{} + if res := libuv.InitTcp(async.Exec().L, &tcp.tcp); res != 0 { resolve(tuple.T2[*Tcp, error]((*Tcp)(nil), libuvError(res))) return } - req, cb := cbind.Bind1[libuv.Connect](func(status c.Int) { + req, _ := cbind.Bind1[libuv.Connect](func(status c.Int) { if status != 0 { resolve(tuple.T2[*Tcp, error]((*Tcp)(nil), libuvError(libuv.Errno(status)))) - } else { - resolve(tuple.T2[*Tcp, error](&Tcp{tcp}, nil)) } }) - if res := libuv.TcpConnect(req, tcp, addr, cb); res != 0 { + if res := libuv.TcpConnect(req, &tcp.tcp, addr, cbind.Callback1[libuv.Connect, c.Int]); res != 0 { resolve(tuple.T2[*Tcp, error]((*Tcp)(nil), libuvError(res))) } + resolve(tuple.T2[*Tcp, error](tcp, nil)) }) } @@ -95,23 +97,14 @@ func allocBuffer(handle *libuv.Handle, suggestedSize uintptr, buf *libuv.Buf) { buf.Len = suggestedSize } -type slice struct { - data unsafe.Pointer - len int -} - -func goBytes(buf *int8, n int) []byte { - return *(*[]byte)(unsafe.Pointer(&slice{unsafe.Pointer(buf), n})) -} - func (t *Tcp) Read() async.IO[tuple.Tuple2[[]byte, error]] { return func(ctx *async.AsyncContext) async.Future[tuple.Tuple2[[]byte, error]] { var result tuple.Tuple2[[]byte, error] var done bool - tcp := (*libuv.Stream)(t.tcp) - libuv.ReadStart(tcp, allocBuffer, func(client *libuv.Stream, nread c.Long, buf *libuv.Buf) { + tcp := (*libuv.Stream)(&t.tcp) + tcp.StartRead(allocBuffer, func(client *libuv.Stream, nread c.Long, buf *libuv.Buf) { if nread > 0 { - result = tuple.T2[[]byte, error](goBytes(buf.Base, int(nread)), nil) + result = tuple.T2[[]byte, error](cbind.GoBytes(buf.Base, int(nread)), nil) } else if nread < 0 { result = tuple.T2[[]byte, error](nil, libuvError(libuv.Errno(nread))) } else { @@ -129,6 +122,6 @@ func (t *Tcp) Read() async.IO[tuple.Tuple2[[]byte, error]] { } } -func (t *Tcp) Close(cb libuv.CloseCb) { - (*libuv.Handle)(unsafe.Pointer(t.tcp)).Close(cb) +func (t *Tcp) Close() { + (*libuv.Handle)(unsafe.Pointer(&t.tcp)).Close(nil) }