diff --git a/cl/_testrt/freevars/in.go b/cl/_testrt/freevars/in.go new file mode 100644 index 00000000..35360254 --- /dev/null +++ b/cl/_testrt/freevars/in.go @@ -0,0 +1,14 @@ +package main + +func main() { + func(resolve func(error)) { + func(err error) { + if err != nil { + resolve(err) + return + } + resolve(nil) + }(nil) + }(func(err error) { + }) +} diff --git a/cl/_testrt/freevars/out.ll b/cl/_testrt/freevars/out.ll new file mode 100644 index 00000000..5613d9e8 --- /dev/null +++ b/cl/_testrt/freevars/out.ll @@ -0,0 +1,118 @@ +; ModuleID = 'main' +source_filename = "main" + +%"github.com/goplus/llgo/internal/runtime.iface" = type { ptr, ptr } +%"github.com/goplus/llgo/internal/runtime.eface" = type { ptr, ptr } + +@"main.init$guard" = global i1 false, align 1 +@__llgo_argc = global i32 0, align 4 +@__llgo_argv = global ptr null, align 8 + +define void @main.init() { +_llgo_0: + %0 = load i1, ptr @"main.init$guard", align 1 + br i1 %0, label %_llgo_2, label %_llgo_1 + +_llgo_1: ; preds = %_llgo_0 + store i1 true, ptr @"main.init$guard", align 1 + br label %_llgo_2 + +_llgo_2: ; preds = %_llgo_1, %_llgo_0 + ret void +} + +define i32 @main(i32 %0, ptr %1) { +_llgo_0: + store i32 %0, ptr @__llgo_argc, align 4 + store ptr %1, ptr @__llgo_argv, align 8 + call void @"github.com/goplus/llgo/internal/runtime.init"() + call void @main.init() + %2 = alloca { ptr, ptr }, align 8 + %3 = getelementptr inbounds { ptr, ptr }, ptr %2, i32 0, i32 0 + store ptr @"__llgo_stub.main.main$2", ptr %3, align 8 + %4 = getelementptr inbounds { ptr, ptr }, ptr %2, i32 0, i32 1 + store ptr null, ptr %4, align 8 + %5 = load { ptr, ptr }, ptr %2, align 8 + call void @"main.main$1"({ ptr, ptr } %5) + ret i32 0 +} + +define void @"main.main$1"({ ptr, ptr } %0) { +_llgo_0: + %1 = call ptr @"github.com/goplus/llgo/internal/runtime.AllocZ"(i64 16) + store { ptr, ptr } %0, ptr %1, align 8 + %2 = call ptr @"github.com/goplus/llgo/internal/runtime.AllocU"(i64 8) + %3 = getelementptr inbounds { ptr }, ptr %2, i32 0, i32 0 + store ptr %1, ptr %3, align 8 + %4 = alloca { ptr, ptr }, align 8 + %5 = getelementptr inbounds { ptr, ptr }, ptr %4, i32 0, i32 0 + store ptr @"main.main$1$1", ptr %5, align 8 + %6 = getelementptr inbounds { ptr, ptr }, ptr %4, i32 0, i32 1 + store ptr %2, ptr %6, align 8 + %7 = load { ptr, ptr }, ptr %4, align 8 + %8 = extractvalue { ptr, ptr } %7, 1 + %9 = extractvalue { ptr, ptr } %7, 0 + call void %9(ptr %8, %"github.com/goplus/llgo/internal/runtime.iface" zeroinitializer) + ret void +} + +define void @"main.main$1$1"(ptr %0, %"github.com/goplus/llgo/internal/runtime.iface" %1) { +_llgo_0: + %2 = call ptr @"github.com/goplus/llgo/internal/runtime.IfaceType"(%"github.com/goplus/llgo/internal/runtime.iface" %1) + %3 = extractvalue %"github.com/goplus/llgo/internal/runtime.iface" %1, 1 + %4 = alloca %"github.com/goplus/llgo/internal/runtime.eface", align 8 + %5 = getelementptr inbounds %"github.com/goplus/llgo/internal/runtime.eface", ptr %4, i32 0, i32 0 + store ptr %2, ptr %5, align 8 + %6 = getelementptr inbounds %"github.com/goplus/llgo/internal/runtime.eface", ptr %4, i32 0, i32 1 + store ptr %3, ptr %6, align 8 + %7 = load %"github.com/goplus/llgo/internal/runtime.eface", ptr %4, align 8 + %8 = call ptr @"github.com/goplus/llgo/internal/runtime.IfaceType"(%"github.com/goplus/llgo/internal/runtime.iface" zeroinitializer) + %9 = alloca %"github.com/goplus/llgo/internal/runtime.eface", align 8 + %10 = getelementptr inbounds %"github.com/goplus/llgo/internal/runtime.eface", ptr %9, i32 0, i32 0 + store ptr %8, ptr %10, align 8 + %11 = getelementptr inbounds %"github.com/goplus/llgo/internal/runtime.eface", ptr %9, i32 0, i32 1 + store ptr null, ptr %11, align 8 + %12 = load %"github.com/goplus/llgo/internal/runtime.eface", ptr %9, align 8 + %13 = call i1 @"github.com/goplus/llgo/internal/runtime.EfaceEqual"(%"github.com/goplus/llgo/internal/runtime.eface" %7, %"github.com/goplus/llgo/internal/runtime.eface" %12) + %14 = xor i1 %13, true + br i1 %14, label %_llgo_1, label %_llgo_2 + +_llgo_1: ; preds = %_llgo_0 + %15 = load { ptr }, ptr %0, align 8 + %16 = extractvalue { ptr } %15, 0 + %17 = load { ptr, ptr }, ptr %16, align 8 + %18 = extractvalue { ptr, ptr } %17, 1 + %19 = extractvalue { ptr, ptr } %17, 0 + call void %19(ptr %18, %"github.com/goplus/llgo/internal/runtime.iface" %1) + ret void + +_llgo_2: ; preds = %_llgo_0 + %20 = load { ptr }, ptr %0, align 8 + %21 = extractvalue { ptr } %20, 0 + %22 = load { ptr, ptr }, ptr %21, align 8 + %23 = extractvalue { ptr, ptr } %22, 1 + %24 = extractvalue { ptr, ptr } %22, 0 + call void %24(ptr %23, %"github.com/goplus/llgo/internal/runtime.iface" zeroinitializer) + ret void +} + +define void @"main.main$2"(%"github.com/goplus/llgo/internal/runtime.iface" %0) { +_llgo_0: + ret void +} + +declare void @"github.com/goplus/llgo/internal/runtime.init"() + +define linkonce void @"__llgo_stub.main.main$2"(ptr %0, %"github.com/goplus/llgo/internal/runtime.iface" %1) { +_llgo_0: + tail call void @"main.main$2"(%"github.com/goplus/llgo/internal/runtime.iface" %1) + ret void +} + +declare ptr @"github.com/goplus/llgo/internal/runtime.AllocZ"(i64) + +declare ptr @"github.com/goplus/llgo/internal/runtime.AllocU"(i64) + +declare i1 @"github.com/goplus/llgo/internal/runtime.EfaceEqual"(%"github.com/goplus/llgo/internal/runtime.eface", %"github.com/goplus/llgo/internal/runtime.eface") + +declare ptr @"github.com/goplus/llgo/internal/runtime.IfaceType"(%"github.com/goplus/llgo/internal/runtime.iface") diff --git a/ssa/decl.go b/ssa/decl.go index 96751a37..7d0b6885 100644 --- a/ssa/decl.go +++ b/ssa/decl.go @@ -169,10 +169,11 @@ type aFunction struct { defer_ *aDefer recov BasicBlock - params []Type - freeVars Expr - base int // base = 1 if hasFreeVars; base = 0 otherwise - hasVArg bool + params []Type + freeVars Expr + freeVarsBlock int + base int // base = 1 if hasFreeVars; base = 0 otherwise + hasVArg bool } // Function represents a function or method. @@ -249,12 +250,13 @@ func (p Function) Param(i int) Expr { } func (p Function) closureCtx(b Builder) Expr { - if p.freeVars.IsNil() { + if p.freeVars.IsNil() || (p.freeVarsBlock != 0 && p.freeVarsBlock != b.blk.Index()) { if p.base == 0 { panic("ssa: function has no free variables") } ptr := Expr{p.impl.Param(0), p.params[0]} p.freeVars = b.Load(ptr) + p.freeVarsBlock = b.blk.Index() } return p.freeVars } diff --git a/x/async/_demo/all/all.go b/x/async/_demo/all/all.go new file mode 100644 index 00000000..0f78e648 --- /dev/null +++ b/x/async/_demo/all/all.go @@ -0,0 +1,244 @@ +package main + +import ( + "fmt" + "os" + "time" + + "github.com/goplus/llgo/x/async" + "github.com/goplus/llgo/x/async/timeout" + "github.com/goplus/llgo/x/socketio" + "github.com/goplus/llgo/x/tuple" +) + +func ReadFile(fileName string) async.Future[tuple.Tuple2[[]byte, error]] { + return async.Async(func(resolve func(tuple.Tuple2[[]byte, error])) { + go func() { + println("read file", fileName) + bytes, err := os.ReadFile(fileName) + resolve(tuple.T2(bytes, err)) + }() + }) +} + +func WriteFile(fileName string, content []byte) async.Future[error] { + return async.Async(func(resolve func(error)) { + go func() { + err := os.WriteFile(fileName, content, 0644) + resolve(err) + }() + }) +} + +func sleep(i int, d time.Duration) async.Future[int] { + return async.Async(func(resolve func(int)) { + timeout.Timeout(d)(func(async.Void) { + resolve(i) + }) + }) +} + +func main() { + RunIO() + RunAllAndRace() + RunTimeout() + RunSocket() +} + +func RunIO() { + println("RunIO with Await") + + // Hide `resolve` in Go+ + async.Run(async.Async(func(resolve func(async.Void)) { + println("read file") + defer resolve(async.Void{}) + content, err := async.Await(ReadFile("all.go")).Get() + if err != nil { + fmt.Printf("read err: %v\n", err) + return + } + fmt.Printf("read content: %s\n", content) + err = async.Await(WriteFile("2.out", content)) + if err != nil { + fmt.Printf("write err: %v\n", err) + return + } + fmt.Printf("write done\n") + })) + + // Translated Await to BindIO in Go+: + println("RunIO with BindIO") + + async.Run(async.Async(func(resolve func(async.Void)) { + ReadFile("all.go")(func(v tuple.Tuple2[[]byte, error]) { + content, err := v.Get() + if err != nil { + fmt.Printf("read err: %v\n", err) + resolve(async.Void{}) + return + } + fmt.Printf("read content: %s\n", content) + WriteFile("2.out", content)(func(v error) { + err = v + if err != nil { + fmt.Printf("write err: %v\n", err) + resolve(async.Void{}) + return + } + println("write done") + resolve(async.Void{}) + }) + }) + })) +} + +func RunAllAndRace() { + ms100 := 100 * time.Millisecond + ms200 := 200 * time.Millisecond + ms300 := 300 * time.Millisecond + + println("Run All with Await") + + async.Run(async.Async(func(resolve func(async.Void)) { + async.All(sleep(1, ms200), sleep(2, ms100), sleep(3, ms300))(func(v []int) { + fmt.Printf("All: %v\n", v) + resolve(async.Void{}) + }) + })) + + println("Run Race with Await") + + async.Run(async.Async(func(resolve func(async.Void)) { + first := async.Race(sleep(1, ms200), sleep(2, ms100), sleep(3, ms300)) + v := async.Await(first) + fmt.Printf("Race: %v\n", v) + resolve(async.Void{}) + })) + + // Translated to in Go+: + + println("Run All with BindIO") + + async.Run(async.Async(func(resolve func(async.Void)) { + async.All(sleep(1, ms200), sleep(2, ms100), sleep(3, ms300))(func(v []int) { + fmt.Printf("All: %v\n", v) + resolve(async.Void{}) + }) + })) + + println("Run Race with BindIO") + + async.Run(async.Async(func(resolve func(async.Void)) { + async.Race(sleep(1, ms200), sleep(2, ms100), sleep(3, ms300))(func(v int) { + fmt.Printf("Race: %v\n", v) + resolve(async.Void{}) + }) + })) +} + +func RunTimeout() { + println("Run Timeout with Await") + + async.Run(async.Async(func(resolve func(async.Void)) { + fmt.Printf("Start 100 ms timeout\n") + async.Await(timeout.Timeout(100 * time.Millisecond)) + fmt.Printf("timeout\n") + resolve(async.Void{}) + })) + + // Translated to in Go+: + + println("Run Timeout with BindIO") + + async.Run(async.Async(func(resolve func(async.Void)) { + fmt.Printf("Start 100 ms timeout\n") + timeout.Timeout(100 * time.Millisecond)(func(async.Void) { + fmt.Printf("timeout\n") + resolve(async.Void{}) + }) + })) +} + +func RunSocket() { + println("Run Socket") + + async.Run(async.Async(func(resolve func(async.Void)) { + println("RunServer") + + RunServer()(func(async.Void) { + println("RunServer done") + resolve(async.Void{}) + }) + + println("RunClient") + + timeout.Timeout(100 * time.Millisecond)(func(async.Void) { + RunClient()(func(async.Void) { + println("RunClient done") + resolve(async.Void{}) + }) + }) + })) +} + +func RunClient() async.Future[async.Void] { + return async.Async(func(resolve func(async.Void)) { + addr := "127.0.0.1:3927" + socketio.Connect("tcp", addr)(func(v tuple.Tuple2[*socketio.Conn, error]) { + client, err := v.Get() + println("Connected", client, err) + if err != nil { + panic(err) + } + counter := 0 + var loop func(client *socketio.Conn) + loop = func(client *socketio.Conn) { + counter++ + data := fmt.Sprintf("Hello %d", counter) + client.Write([]byte(data))(func(err error) { + if err != nil { + panic(err) + } + client.Read()(func(v tuple.Tuple2[[]byte, error]) { + data, err := v.Get() + if err != nil { + panic(err) + } + println("Read from server:", string(data)) + timeout.Timeout(1 * time.Second)(func(async.Void) { + loop(client) + }) + }) + }) + } + loop(client) + }) + }) +} + +func RunServer() async.Future[async.Void] { + return async.Async(func(resolve func(async.Void)) { + socketio.Listen("tcp", "0.0.0.0:3927", func(client *socketio.Conn, err error) { + println("Client connected", client, err) + var loop func(client *socketio.Conn) + loop = func(client *socketio.Conn) { + client.Read()(func(v tuple.Tuple2[[]byte, error]) { + data, err := v.Get() + if err != nil { + println("Read error", err) + } else { + println("Read from client:", string(data)) + client.Write(data)(func(err error) { + if err != nil { + println("Write error", err) + } else { + loop(client) + } + }) + } + }) + } + loop(client) + }) + }) +} diff --git a/x/async/async.go b/x/async/async.go new file mode 100644 index 00000000..fcd9a2e9 --- /dev/null +++ b/x/async/async.go @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2024 The GoPlus Authors (goplus.org). All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package async + +import ( + _ "unsafe" +) + +type Void = [0]byte + +type Future[T any] func(func(T)) + +// Just for pure LLGo/Go, transpile to callback in Go+ +func Await[T1 any](future Future[T1]) T1 { + return Run(future) +} diff --git a/x/async/async_go.go b/x/async/async_go.go new file mode 100644 index 00000000..c775b7e8 --- /dev/null +++ b/x/async/async_go.go @@ -0,0 +1,67 @@ +//go:build !llgo +// +build !llgo + +/* + * Copyright (c) 2024 The GoPlus Authors (goplus.org). All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package async + +import "sync" + +func Async[T any](fn func(func(T))) Future[T] { + return func(chain func(T)) { + go fn(chain) + } +} + +// ----------------------------------------------------------------------------- + +func Race[T1 any](futures ...Future[T1]) Future[T1] { + return Async(func(resolve func(T1)) { + ch := make(chan T1) + for _, future := range futures { + future := future + future(func(v T1) { + defer func() { + // Avoid panic when the channel is closed. + _ = recover() + }() + ch <- v + }) + } + v := <-ch + close(ch) + resolve(v) + }) +} + +func All[T1 any](futures ...Future[T1]) Future[[]T1] { + return Async(func(resolve func([]T1)) { + n := len(futures) + results := make([]T1, n) + wg := sync.WaitGroup{} + wg.Add(n) + for i, future := range futures { + i := i + future(func(v T1) { + results[i] = v + wg.Done() + }) + } + wg.Wait() + resolve(results) + }) +} diff --git a/x/async/async_llgo.go b/x/async/async_llgo.go new file mode 100644 index 00000000..d301b20c --- /dev/null +++ b/x/async/async_llgo.go @@ -0,0 +1,82 @@ +//go:build llgo +// +build llgo + +/* + * Copyright (c) 2024 The GoPlus Authors (goplus.org). All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package async + +import ( + "sync/atomic" + + "github.com/goplus/llgo/c/libuv" + "github.com/goplus/llgo/x/cbind" +) + +// Currently Async run chain a future that call chain in the goroutine running `async.Run`. +// TODO(lijie): It would better to switch when needed. +func Async[T any](fn func(func(T))) Future[T] { + return func(chain func(T)) { + loop := Exec().L + + var result T + var a *libuv.Async + var cb libuv.AsyncCb + a, cb = cbind.BindF[libuv.Async, libuv.AsyncCb](func(a *libuv.Async) { + a.Close(nil) + chain(result) + }) + loop.Async(a, cb) + fn(func(v T) { + result = v + a.Send() + }) + } +} + +// ----------------------------------------------------------------------------- + +func Race[T1 any](futures ...Future[T1]) Future[T1] { + return Async(func(resolve func(T1)) { + done := atomic.Bool{} + for _, future := range futures { + future(func(v T1) { + if !done.Swap(true) { + // Just resolve the first one. + resolve(v) + } + }) + } + }) +} + +func All[T1 any](futures ...Future[T1]) Future[[]T1] { + return Async(func(resolve func([]T1)) { + n := len(futures) + results := make([]T1, n) + var done uint32 + for i, future := range futures { + i := i + future(func(v T1) { + results[i] = v + if atomic.AddUint32(&done, 1) == uint32(n) { + // All done. + resolve(results) + } + }) + } + }) +} diff --git a/x/async/executor_go.go b/x/async/executor_go.go new file mode 100644 index 00000000..60962638 --- /dev/null +++ b/x/async/executor_go.go @@ -0,0 +1,30 @@ +//go:build !llgo +// +build !llgo + +/* + * Copyright (c) 2024 The GoPlus Authors (goplus.org). All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package async + +func Run[T any](future Future[T]) T { + ch := make(chan T) + go func() { + future(func(v T) { + ch <- v + }) + }() + return <-ch +} diff --git a/x/async/executor_llgo.go b/x/async/executor_llgo.go new file mode 100644 index 00000000..e1e03a6a --- /dev/null +++ b/x/async/executor_llgo.go @@ -0,0 +1,69 @@ +//go:build llgo +// +build llgo + +/* + * Copyright (c) 2024 The GoPlus Authors (goplus.org). All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package async + +import ( + "unsafe" + + "github.com/goplus/llgo/c/libuv" + "github.com/goplus/llgo/c/pthread" +) + +var execKey pthread.Key + +func init() { + execKey.Create(nil) +} + +type Executor struct { + L *libuv.Loop +} + +func Exec() *Executor { + v := execKey.Get() + if v == nil { + panic("async.Exec: no executor") + } + return (*Executor)(v) +} + +func setExec(e *Executor) (old *Executor) { + old = (*Executor)(execKey.Get()) + execKey.Set(unsafe.Pointer(e)) + return +} + +func (e *Executor) Run() { + e.L.Run(libuv.RUN_DEFAULT) +} + +func Run[T any](future Future[T]) T { + loop := libuv.LoopNew() + exec := &Executor{loop} + oldExec := setExec(exec) + var ret T + future(func(v T) { + ret = v + }) + exec.Run() + loop.Close() + setExec(oldExec) + return ret +} diff --git a/x/async/timeout/timeout_go.go b/x/async/timeout/timeout_go.go new file mode 100644 index 00000000..32839079 --- /dev/null +++ b/x/async/timeout/timeout_go.go @@ -0,0 +1,35 @@ +//go:build !llgo +// +build !llgo + +/* + * Copyright (c) 2024 The GoPlus Authors (goplus.org). All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package timeout + +import ( + "time" + + "github.com/goplus/llgo/x/async" +) + +func Timeout(d time.Duration) async.Future[async.Void] { + return async.Async(func(resolve func(async.Void)) { + go func() { + time.Sleep(d) + resolve(async.Void{}) + }() + }) +} diff --git a/x/async/timeout/timeout_llgo.go b/x/async/timeout/timeout_llgo.go new file mode 100644 index 00000000..ed18ce26 --- /dev/null +++ b/x/async/timeout/timeout_llgo.go @@ -0,0 +1,44 @@ +//go:build llgo +// +build llgo + +/* + * Copyright (c) 2024 The GoPlus Authors (goplus.org). All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package timeout + +import ( + "time" + + "github.com/goplus/llgo/c/libuv" + "github.com/goplus/llgo/x/async" + "github.com/goplus/llgo/x/cbind" +) + +func Timeout(d time.Duration) async.Future[async.Void] { + return async.Async(func(resolve func(async.Void)) { + t, cb := cbind.BindF[libuv.Timer, libuv.TimerCb](func(t *libuv.Timer) { + resolve(async.Void{}) + }) + r := libuv.InitTimer(async.Exec().L, t) + if r != 0 { + panic("InitTimer failed") + } + r = t.Start(cb, uint64(d/time.Millisecond), 0) + if r != 0 { + panic("Start failed") + } + }) +} diff --git a/x/cbind/buf.go b/x/cbind/buf.go new file mode 100644 index 00000000..b142d6d3 --- /dev/null +++ b/x/cbind/buf.go @@ -0,0 +1,16 @@ +package cbind + +import "unsafe" + +type slice struct { + data unsafe.Pointer + len int +} + +func GoBytes(buf *int8, n int) []byte { + return *(*[]byte)(unsafe.Pointer(&slice{unsafe.Pointer(buf), n})) +} + +func CBuffer(data []byte) (*int8, int) { + return (*int8)(unsafe.Pointer(&data[0])), len(data) +} diff --git a/x/cbind/cbind.go b/x/cbind/cbind.go new file mode 100644 index 00000000..7ec34b99 --- /dev/null +++ b/x/cbind/cbind.go @@ -0,0 +1,143 @@ +/* + * Copyright (c) 2024 The GoPlus Authors (goplus.org). All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cbind + +import ( + "unsafe" +) + +// llgo:type C +type Cb[T any] func(*T) + +// llgo:type C +type Cb1[T any, A any] func(*T, A) + +// llgo:type C +type Cb2[T any, A any, B any] func(*T, A, B) + +// llgo:type C +type Cb3[T any, A any, B any, C any] func(*T, A, B, C) + +type bind[Base any] struct { + b Base + fn func(*Base) +} + +type bind1[Base any, A any] struct { + b Base + fn func(*Base, A) +} + +type bind2[Base any, A any, B any] struct { + b Base + fn func(*Base, A, B) +} + +type bind3[Base any, A any, B any, C any] struct { + b Base + fn func(*Base, A, B, C) +} + +func callback[Base any](base *Base) { + bind := (*bind[Base])(unsafe.Pointer(base)) + bind.fn(base) +} + +func callback1[Base any, A any](base *Base, a A) { + bind := (*bind1[Base, A])(unsafe.Pointer(base)) + bind.fn(base, a) +} + +func callback2[Base any, A any, B any](base *Base, a A, b B) { + bind := (*bind2[Base, A, B])(unsafe.Pointer(base)) + bind.fn(base, a, b) +} + +func callback3[Base any, A any, B any, C any](base *Base, a A, b B, c C) { + bind := (*bind3[Base, A, B, C])(unsafe.Pointer(base)) + bind.fn(base, a, b, c) +} + +/** + * Bind[N] binds a Go function to a C callback function. + * + * Example: + * + * timer, cb := cbind.Bind[libuv.Timer](func() { + * println("hello") + * }) + * libuv.InitTimer(async.Exec().L, timer) + * timer.Start(cb, 1000, 0) + * + * @param call The Go function to bind. + * @return The data pointer and the C callback function. + */ +func Bind[T any](call func(*T)) (p *T, cb Cb[T]) { + bb := &bind[T]{fn: call} + p = (*T)(unsafe.Pointer(bb)) + cb = callback[T] + return +} + +func BindF[T any, F ~func(*T)](call func(*T)) (*T, F) { + bb := &bind[T]{fn: call} + p := (*T)(unsafe.Pointer(bb)) + var fn F = callback[T] + return p, fn +} + +func Bind1[T any, A any](call func(*T, A)) (p *T, cb Cb1[T, A]) { + bb := &bind1[T, A]{fn: call} + p = (*T)(unsafe.Pointer(bb)) + cb = callback1[T, A] + return +} + +func Bind1F[T any, F ~func(*T, A), A any](call func(*T, A)) (*T, F) { + bb := &bind1[T, A]{fn: call} + p := (*T)(unsafe.Pointer(bb)) + var fn F = callback1[T, A] + return p, fn +} + +func Bind2[T any, A any, B any](call func(*T, A, B)) (p *T, cb Cb2[T, A, B]) { + bb := &bind2[T, A, B]{fn: call} + p = (*T)(unsafe.Pointer(bb)) + cb = callback2[T, A, B] + return +} + +func Bind2F[T any, F ~func(*T, A, B), A any, B any](call func(*T, A, B)) (*T, F) { + bb := &bind2[T, A, B]{fn: call} + p := (*T)(unsafe.Pointer(bb)) + var fn F = callback2[T, A, B] + return p, fn +} + +func Bind3[T any, A any, B any, C any](call func(*T, A, B, C), a A, b B, c C) (p *T, cb Cb3[T, A, B, C]) { + bb := &bind3[T, A, B, C]{fn: call} + p = (*T)(unsafe.Pointer(bb)) + cb = callback3[T, A, B, C] + return +} + +func Bind3F[T any, F ~func(*T, A, B, C), A any, B any, C any](call func(*T, A, B, C), a A, b B, c C) (*T, F) { + bb := &bind3[T, A, B, C]{fn: call} + p := (*T)(unsafe.Pointer(bb)) + var fn F = callback3[T, A, B, C] + return p, fn +} diff --git a/x/io/_demo/asyncdemo/async.go b/x/io/_demo/asyncdemo/async.go deleted file mode 100644 index 02b4475a..00000000 --- a/x/io/_demo/asyncdemo/async.go +++ /dev/null @@ -1,290 +0,0 @@ -package main - -import ( - "encoding/json" - "fmt" - "log" - "time" - - "github.com/goplus/llgo/x/io" -) - -// ----------------------------------------------------------------------------- - -type Response struct { - StatusCode int - - mockBody string -} - -func (r *Response) mock(body string) { - r.mockBody = body -} - -func (r *Response) Text() (resolve io.Promise[string]) { - resolve(r.mockBody, nil) - return -} - -func (r *Response) TextCompiled() *io.PromiseImpl[string] { - P := &io.PromiseImpl[string]{} - P.Func = func(resolve func(string, error)) { - for { - switch P.Prev = P.Next; P.Prev { - case 0: - resolve(r.mockBody, nil) - P.Next = -1 - return - default: - panic("Promise already done") - } - } - } - return P -} - -func HttpGet(url string, callback func(resp *Response, err error)) { - resp := &Response{StatusCode: 200} - callback(resp, nil) -} - -func AsyncHttpGet(url string) (resolve io.Promise[*Response]) { - HttpGet(url, resolve) - return -} - -func AsyncHttpGetCompiled(url string) *io.PromiseImpl[*Response] { - P := &io.PromiseImpl[*Response]{} - P.Func = func(resolve func(*Response, error)) { - for { - switch P.Prev = P.Next; P.Prev { - case 0: - HttpGet(url, resolve) - P.Next = -1 - return - default: - panic("Promise already done") - } - } - } - return P -} - -// ----------------------------------------------------------------------------- - -type User struct { - Name string -} - -func GetUser(uid string) (resolve io.Promise[User]) { - resp, err := AsyncHttpGet("http://example.com/user/" + uid).Await() - if err != nil { - resolve(User{}, err) - return - } - - if resp.StatusCode != 200 { - resolve(User{}, fmt.Errorf("http status code: %d", resp.StatusCode)) - return - } - - resp.mock(`{"name":"Alice"}`) - - body, err := resp.Text().Await() - if err != nil { - resolve(User{}, err) - return - } - user := User{} - if err := json.Unmarshal([]byte(body), &user); err != nil { - resolve(User{}, err) - return - } - - resolve(user, nil) - return -} - -func GetUserCompiled(uid string) *io.PromiseImpl[User] { - var state1 *io.PromiseImpl[*Response] - var state2 *io.PromiseImpl[string] - - P := &io.PromiseImpl[User]{} - P.Func = func(resolve func(User, error)) { - for { - switch P.Prev = P.Next; P.Prev { - case 0: - state1 = AsyncHttpGetCompiled("http://example.com/user/" + uid) - P.Next = 1 - return - case 1: - state1.EnsureDone() - resp, err := state1.Value, state1.Err - if err != nil { - resolve(User{}, err) - return - } - - if resp.StatusCode != 200 { - resolve(User{}, fmt.Errorf("http status code: %d", resp.StatusCode)) - return - } - - resp.mock(`{"name":"Alice"}`) - - state2 = resp.TextCompiled() - P.Next = 2 - return - case 2: - state2.EnsureDone() - body, err := state2.Value, state2.Err - if err != nil { - resolve(User{}, err) - return - } - user := User{} - if err := json.Unmarshal([]byte(body), &user); err != nil { - resolve(User{}, err) - return - } - - resolve(user, nil) - P.Next = -1 - return - default: - panic("Promise already done") - } - } - } - return P -} - -func GetScore() *io.Promise[float64] { - panic("todo: GetScore") -} - -func GetScoreCompiled() *io.PromiseImpl[float64] { - P := &io.PromiseImpl[float64]{} - P.Func = func(resolve func(float64, error)) { - for { - switch P.Prev = P.Next; P.Prev { - case 0: - panic("todo: GetScore") - default: - panic("Promise already done") - } - } - } - return P -} - -func DoUpdate(op string) *io.Promise[io.Void] { - panic("todo: DoUpdate") -} - -func DoUpdateCompiled(op string) *io.PromiseImpl[io.Void] { - P := &io.PromiseImpl[io.Void]{} - P.Func = func(resolve func(io.Void, error)) { - for { - switch P.Prev = P.Next; P.Prev { - case 0: - panic("todo: DoUpdate") - default: - panic("Promise already done") - } - } - } - return P -} - -func Demo() (resolve io.Promise[io.Void]) { - user, err := GetUser("123").Await() - log.Println(user, err) - - user, err = io.Race[User](GetUser("123"), GetUser("456"), GetUser("789")).Await() - log.Println(user, err) - - users, err := io.All[User]([]io.AsyncCall[User]{GetUser("123"), GetUser("456"), GetUser("789")}).Await() - log.Println(users, err) - - user, score, _, err := io.Await3[User, float64, io.Void](GetUser("123"), GetScore(), DoUpdate("update sth.")) - log.Println(user, score, err) - - // TODO(lijie): select from multiple promises without channel - select { - case user := <-GetUser("123").Chan(): - log.Println("user:", user) - case score := <-GetScore().Chan(): - log.Println("score:", score) - case <-io.Timeout(5 * time.Second).Chan(): - log.Println("timeout") - } - return -} - -func DemoCompiled() *io.PromiseImpl[io.Void] { - var state1 *io.PromiseImpl[User] - var state2 *io.PromiseImpl[User] - var state3 *io.PromiseImpl[[]User] - var state4 *io.PromiseImpl[io.Await3Result[User, float64, io.Void]] - - P := &io.PromiseImpl[io.Void]{} - P.Func = func(resolve func(io.Void, error)) { - for { - switch P.Prev = P.Next; P.Prev { - case 0: - state1 = GetUserCompiled("123") - P.Next = 1 - return - case 1: - state1.EnsureDone() - user, err := state1.Value, state1.Err - log.Printf("user: %v, err: %v\n", user, err) - - state2 = io.Race[User](GetUserCompiled("123"), GetUserCompiled("456"), GetUserCompiled("789")) - P.Next = 2 - return - case 2: - state2.EnsureDone() - user, err := state2.Value, state2.Err - log.Println(user, err) - - state3 = io.All[User]([]io.AsyncCall[User]{GetUserCompiled("123"), GetUserCompiled("456"), GetUserCompiled("789")}) - P.Next = 3 - return - case 3: - state3.EnsureDone() - users, err := state3.Value, state3.Err - log.Println(users, err) - - state4 = io.Await3Compiled[User, float64, io.Void](GetUserCompiled("123"), GetScoreCompiled(), DoUpdateCompiled("update sth.")) - P.Next = 4 - return - case 4: - state4.EnsureDone() - user, score, _, err := state4.Value.V1, state4.Value.V2, state4.Value.V3, state4.Value.Err - log.Println(user, score, err) - - select { - case user := <-GetUserCompiled("123").Chan(): - log.Println("user:", user) - case score := <-GetScoreCompiled().Chan(): - log.Println("score:", score) - case <-io.TimeoutCompiled(5 * time.Second).Chan(): - log.Println("timeout") - } - P.Next = -1 - return - default: - panic("Promise already done") - } - } - } - return P -} - -func main() { - log.SetFlags(log.Lshortfile | log.LstdFlags) - // io.Run(Demo()) - io.Run(DemoCompiled()) -} diff --git a/x/io/io.go b/x/io/io.go deleted file mode 100644 index f4b4c18b..00000000 --- a/x/io/io.go +++ /dev/null @@ -1,170 +0,0 @@ -/* - * Copyright (c) 2024 The GoPlus Authors (goplus.org). All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io - -import ( - _ "unsafe" - - "time" -) - -const ( - LLGoPackage = "decl" -) - -type Void = [0]byte - -// ----------------------------------------------------------------------------- - -type AsyncCall[OutT any] interface { - Await(timeout ...time.Duration) (ret OutT, err error) - Chan() <-chan OutT - EnsureDone() -} - -// llgo:link AsyncCall.Await llgo.await -func Await[OutT any](call AsyncCall[OutT], timeout ...time.Duration) (ret OutT, err error) { - return -} - -//go:linkname Timeout llgo.timeout -func Timeout(time.Duration) (ret AsyncCall[Void]) - -func TimeoutCompiled(d time.Duration) *PromiseImpl[Void] { - P := &PromiseImpl[Void]{} - P.Func = func(resolve func(Void, error)) { - go func() { - time.Sleep(d) - resolve(Void{}, nil) - }() - } - return P -} - -// llgo:link Race llgo.race -func Race[OutT any](acs ...AsyncCall[OutT]) (ret *PromiseImpl[OutT]) { - return -} - -func All[OutT any](acs []AsyncCall[OutT]) (ret *PromiseImpl[[]OutT]) { - return nil -} - -// llgo:link Await2 llgo.await -func Await2[OutT1, OutT2 any]( - ac1 AsyncCall[OutT1], ac2 AsyncCall[OutT2], - timeout ...time.Duration) (ret1 OutT1, ret2 OutT2, err error) { - return -} - -type Await2Result[T1 any, T2 any] struct { - V1 T1 - V2 T2 - Err error -} - -func Await2Compiled[OutT1, OutT2 any]( - ac1 AsyncCall[OutT1], ac2 AsyncCall[OutT2], - timeout ...time.Duration) (ret *PromiseImpl[Await2Result[OutT1, OutT2]]) { - return -} - -// llgo:link Await3 llgo.await -func Await3[OutT1, OutT2, OutT3 any]( - ac1 AsyncCall[OutT1], ac2 AsyncCall[OutT2], ac3 AsyncCall[OutT3], - timeout ...time.Duration) (ret1 OutT1, ret2 OutT2, ret3 OutT3, err error) { - return -} - -type Await3Result[T1 any, T2 any, T3 any] struct { - V1 T1 - V2 T2 - V3 T3 - Err error -} - -func Await3Compiled[OutT1, OutT2, OutT3 any]( - ac1 AsyncCall[OutT1], ac2 AsyncCall[OutT2], ac3 AsyncCall[OutT3], - timeout ...time.Duration) (ret *PromiseImpl[Await3Result[OutT1, OutT2, OutT3]]) { - return -} - -func Run(ac AsyncCall[Void]) { - p := ac.(*PromiseImpl[Void]) - p.Resume() - <-ac.Chan() -} - -// ----------------------------------------------------------------------------- - -type Promise[OutT any] func(OutT, error) - -// llgo:link Promise.Await llgo.await -func (p Promise[OutT]) Await(timeout ...time.Duration) (ret OutT, err error) { - return -} - -func (p Promise[OutT]) Chan() <-chan OutT { - return nil -} - -func (p Promise[OutT]) EnsureDone() { - -} - -// ----------------------------------------------------------------------------- - -type PromiseImpl[TOut any] struct { - Func func(resolve func(TOut, error)) - Value TOut - Err error - Prev int - Next int - - c chan TOut -} - -func (p *PromiseImpl[TOut]) Resume() { - p.Func(func(v TOut, err error) { - p.Value = v - p.Err = err - }) -} - -func (p *PromiseImpl[TOut]) EnsureDone() { - if p.Next == -1 { - panic("Promise already done") - } -} - -func (p *PromiseImpl[TOut]) Chan() <-chan TOut { - if p.c == nil { - p.c = make(chan TOut, 1) - p.Func(func(v TOut, err error) { - p.Value = v - p.Err = err - p.c <- v - }) - } - return p.c -} - -func (p *PromiseImpl[TOut]) Await(timeout ...time.Duration) (ret TOut, err error) { - panic("should not called") -} - -// ----------------------------------------------------------------------------- diff --git a/x/io/README.md b/x/socketio/README.md similarity index 80% rename from x/io/README.md rename to x/socketio/README.md index 9681b3ab..da6a4c8e 100644 --- a/x/io/README.md +++ b/x/socketio/README.md @@ -367,98 +367,23 @@ In some situations, you may want to get the first result of multiple async opera ## Design -Introduce `Promise` type to represent an asynchronous operation and its resulting value. `Promise` can be resolved with a value with an error. `Promise` can be awaited to get the value and error. - -`Promise` just a type indicating the asynchronous operation, it can't be created and assigned directly. It be replaced to `PromiseImpl` by the LLGo compiler. +Introduce `async.IO[T]` type to represent an asynchronous operation, `async.Future[T]` type to represent the result of an asynchronous operation. `async.IO[T]` can be `bind` to a function that accepts `T` as an argument to chain multiple asynchronous operations. `async.IO[T]` can be `await` to get the value of the asynchronous operation. ```go -// Some native async functions -func timeoutAsync(d time.Duration, cb func()) { - go func() { - time.Sleep(d) - cb() - }() -} +package async -// Wrap callback-based async function into Promise -func resolveAfter1Second() (resolve Promise[string]) { - timeoutAsync(1 * time.Second, func() { - resolve("Resolved after 1 second", nil) - }) -} +type Future[T any] func() T +type IO[T any] func() Future[T] -// Compiled to: -func resolveAfter1Second() (resolve PromiseImpl[string]) { - promise := io.NewPromiseImpl[string](resolve func(value string, err error) { - resolve: func(value string, err error) { - for true { - switch (promise.prev = promise.next) { - case 0: - timeoutAsync(1 * time.Second, func() { - resolve("Resolved after 1 second", nil) - }) - } - } - }, - } - return promise -} - -func asyncCall() (resolve Promise[string]) { - str, err := resolveAfter1Second().Await() - resolve("AsyncCall: " + str, err) -} - -// Compiled to: -func asyncCall() (resolve PromiseImpl[string]) { - promise := io.NewPromiseImpl[string](resolve func(value string, err error) { - for true { - switch (promise.prev = promise.next) { - case 0: - resolveAfter1Second() - return - case 1: - str, err := promise.value, promise.err - resolve("AsyncCall: " + str, err) - return - } +func main() { + io := func() Future[string] { + return func() string { + return "Hello, World!" } - }) - return promise -} + } -// Directly return Promise -func asyncCall2() Promise[string] { - return resolveAfter1Second() -} - -// Compiled to: -func asyncCall2() PromiseImpl[string] { - return resolveAfter1Second() -} - -// Don't wait for Promise to complete -func asyncCall3() { - resolveAfter1Second().Then(func(result string) { - fmt.Println("AsyncCall3: " + result) - }) -} - -func asyncMain() { - fmt.Println("Starting AsyncCall") - result1 := asyncCall().Await() - fmt.Println(result1) - - fmt.Println("Starting AsyncCall2") - result2 := asyncCall2().Await() - fmt.Println(result2) - - fmt.Println("Starting AsyncCall3") - asyncCall3() - - // Wait for AsyncCall3 to complete - time.Sleep(2 * time.Second) - - fmt.Println("Main function completed") + future := io() + value := future() + println(value) } ``` diff --git a/x/socketio/socketio_go.go b/x/socketio/socketio_go.go new file mode 100644 index 00000000..9a32e433 --- /dev/null +++ b/x/socketio/socketio_go.go @@ -0,0 +1,92 @@ +//go:build !llgo +// +build !llgo + +/* + * Copyright (c) 2024 The GoPlus Authors (goplus.org). All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package socketio + +import ( + "net" + + "github.com/goplus/llgo/x/async" + "github.com/goplus/llgo/x/tuple" +) + +type Conn struct { + conn net.Conn +} + +func Listen(protocol, bindAddr string, listenCb func(client *Conn, err error)) { + go func() { + listener, err := net.Listen(protocol, bindAddr) + if err != nil { + listenCb(nil, err) + return + } + for { + conn, err := listener.Accept() + if err != nil { + listenCb(nil, err) + return + } + listenCb(&Conn{conn: conn}, nil) + } + }() +} + +func Connect(network, addr string) async.Future[tuple.Tuple2[*Conn, error]] { + return async.Async(func(resolve func(tuple.Tuple2[*Conn, error])) { + go func() { + conn, err := net.Dial(network, addr) + if err != nil { + resolve(tuple.T2[*Conn, error](nil, err)) + return + } + resolve(tuple.T2[*Conn, error](&Conn{conn: conn}, nil)) + }() + }) +} + +// Read once from the TCP connection. +func (t *Conn) Read() async.Future[tuple.Tuple2[[]byte, error]] { + return async.Async(func(resolve func(tuple.Tuple2[[]byte, error])) { + go func() { + buf := make([]byte, 1024) + n, err := t.conn.Read(buf) + if err != nil { + resolve(tuple.T2[[]byte, error](nil, err)) + return + } + resolve(tuple.T2[[]byte, error](buf[:n], nil)) + }() + }) +} + +func (t *Conn) Write(data []byte) async.Future[error] { + return async.Async(func(resolve func(error)) { + go func() { + _, err := t.conn.Write(data) + resolve(err) + }() + }) +} + +func (t *Conn) Close() { + if t.conn != nil { + t.conn.Close() + } +} diff --git a/x/socketio/socketio_llgo.go b/x/socketio/socketio_llgo.go new file mode 100644 index 00000000..e6b799a5 --- /dev/null +++ b/x/socketio/socketio_llgo.go @@ -0,0 +1,260 @@ +//go:build llgo +// +build llgo + +/* + * Copyright (c) 2024 The GoPlus Authors (goplus.org). All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package socketio + +import ( + "strings" + "syscall" + "unsafe" + _ "unsafe" + + "github.com/goplus/llgo/c" + "github.com/goplus/llgo/c/libuv" + "github.com/goplus/llgo/c/net" + "github.com/goplus/llgo/x/async" + "github.com/goplus/llgo/x/cbind" + "github.com/goplus/llgo/x/tuple" +) + +type Listener struct { + tcp libuv.Tcp + listenCb func(server *Listener, err error) +} + +type Conn struct { + tcp libuv.Tcp + readCb func([]byte, error) +} + +type libuvError libuv.Errno + +func (e libuvError) Error() string { + s := libuv.Strerror(libuv.Errno(e)) + return c.GoString(s, c.Strlen(s)) +} + +type getAddrInfoBind struct { + libuv.GetAddrInfo + resolve func(tuple.Tuple2[*net.SockAddr, error]) +} + +func getAddrInfoCb(p *libuv.GetAddrInfo, status c.Int, addr *net.AddrInfo) { + bind := (*getAddrInfoBind)(unsafe.Pointer(p)) + if status != 0 { + bind.resolve(tuple.T2[*net.SockAddr, error](nil, libuvError(status))) + return + } + bind.resolve(tuple.T2[*net.SockAddr, error](addr.Addr, nil)) +} + +func parseAddr(addr string) async.Future[tuple.Tuple2[*net.SockAddr, error]] { + return async.Async(func(resolve func(tuple.Tuple2[*net.SockAddr, error])) { + host := "127.0.0.1" + var port string + // split host and service by last colon + idx := strings.LastIndex(addr, ":") + if idx < 0 { + port = addr + } else { + host = addr[:idx] + port = addr[idx+1:] + } + + hints := &net.AddrInfo{ + Family: net.AF_INET, + SockType: net.SOCK_STREAM, + Protocol: syscall.IPPROTO_TCP, + Flags: 0, + } + + req, cb := cbind.Bind2F[libuv.GetAddrInfo, libuv.GetaddrinfoCb](func(i *libuv.GetAddrInfo, status c.Int, addr *net.AddrInfo) { + if status != 0 { + resolve(tuple.T2[*net.SockAddr, error](nil, libuvError(status))) + return + } + resolve(tuple.T2[*net.SockAddr, error](addr.Addr, nil)) + }) + if res := libuv.Getaddrinfo(async.Exec().L, req, cb, c.AllocaCStr(host), c.AllocaCStr(port), hints); res != 0 { + resolve(tuple.T2[*net.SockAddr, error](nil, libuvError(res))) + return + } + }) +} + +func Listen(protocol, bindAddr string, listenCb func(client *Conn, err error)) { + tcp, err := newListener() + if err != nil { + listenCb(nil, err) + return + } + parseAddr(bindAddr)(func(v tuple.Tuple2[*net.SockAddr, error]) { + addr, err := v.Get() + if err != nil { + listenCb(nil, err) + return + } + if err := tcp.bind(addr, 0); err != nil { + listenCb(nil, err) + return + } + if err := tcp.listen(128, func(server *Listener, err error) { + client, err := server.accept() + listenCb(client, err) + }); err != nil { + listenCb(nil, err) + } + }) +} + +func newListener() (*Listener, error) { + t := &Listener{} + if res := libuv.InitTcp(async.Exec().L, &t.tcp); res != 0 { + return nil, libuvError(res) + } + return t, nil +} + +func (t *Listener) bind(addr *net.SockAddr, flags uint) error { + if res := t.tcp.Bind(addr, c.Uint(flags)); res != 0 { + return libuvError(res) + } + return nil +} + +func (l *Listener) listen(backlog int, cb func(server *Listener, err error)) error { + l.listenCb = cb + res := (*libuv.Stream)(&l.tcp).Listen(c.Int(backlog), func(s *libuv.Stream, status c.Int) { + server := (*Listener)(unsafe.Pointer(s)) + if status != 0 { + server.listenCb(server, libuvError(libuv.Errno(status))) + } else { + server.listenCb(server, nil) + } + }) + if res != 0 { + return libuvError(res) + } + return nil +} + +func (l *Listener) accept() (client *Conn, err error) { + tcp := &Conn{} + if res := libuv.InitTcp(async.Exec().L, &tcp.tcp); res != 0 { + return nil, libuvError(res) + } + if res := (*libuv.Stream)(&l.tcp).Accept((*libuv.Stream)(&tcp.tcp)); res != 0 { + return nil, libuvError(res) + } + return tcp, nil +} + +func Connect(network, addr string) async.Future[tuple.Tuple2[*Conn, error]] { + return async.Async(func(resolve func(tuple.Tuple2[*Conn, error])) { + parseAddr(addr)(func(v tuple.Tuple2[*net.SockAddr, error]) { + addr, err := v.Get() + if err != nil { + resolve(tuple.T2[*Conn, error]((*Conn)(nil), err)) + return + } + + tcp := &Conn{} + if res := libuv.InitTcp(async.Exec().L, &tcp.tcp); res != 0 { + resolve(tuple.T2[*Conn, error]((*Conn)(nil), libuvError(res))) + return + } + req, cb := cbind.Bind1F[libuv.Connect, libuv.ConnectCb](func(c *libuv.Connect, status c.Int) { + if status != 0 { + resolve(tuple.T2[*Conn, error]((*Conn)(nil), libuvError(libuv.Errno(status)))) + } else { + resolve(tuple.T2[*Conn, error](tcp, nil)) + } + }) + if res := libuv.TcpConnect(req, &tcp.tcp, addr, cb); res != 0 { + resolve(tuple.T2[*Conn, error]((*Conn)(nil), libuvError(res))) + return + } + }) + }) +} + +func allocBuffer(handle *libuv.Handle, suggestedSize uintptr, buf *libuv.Buf) { + buf.Base = (*c.Char)(c.Malloc(suggestedSize)) + buf.Len = suggestedSize +} + +func (t *Conn) StartRead(fn func(data []byte, err error)) { + t.readCb = func(data []byte, err error) { + fn(data, err) + } + tcp := (*libuv.Stream)(&t.tcp) + res := tcp.StartRead(allocBuffer, func(client *libuv.Stream, nread c.Long, buf *libuv.Buf) { + tcp := (*Conn)(unsafe.Pointer(client)) + if nread > 0 { + tcp.readCb(cbind.GoBytes(buf.Base, int(nread)), nil) + } else if nread < 0 { + tcp.readCb(nil, libuvError(libuv.Errno(nread))) + } else { + tcp.readCb(nil, nil) + } + }) + if res != 0 { + t.readCb(nil, libuvError(libuv.Errno(res))) + } +} + +func (t *Conn) StopRead() error { + tcp := (*libuv.Stream)(&t.tcp) + if res := tcp.StopRead(); res != 0 { + return libuvError(libuv.Errno(res)) + } + return nil +} + +// Read once from the TCP connection. +func (t *Conn) Read() async.Future[tuple.Tuple2[[]byte, error]] { + return async.Async(func(resolve func(tuple.Tuple2[[]byte, error])) { + t.StartRead(func(data []byte, err error) { + if err := t.StopRead(); err != nil { + panic(err) + } + resolve(tuple.T2[[]byte, error](data, err)) + }) + }) +} + +func (t *Conn) Write(data []byte) async.Future[error] { + return async.Async(func(resolve func(error)) { + writer, cb := cbind.Bind1F[libuv.Write, libuv.WriteCb](func(req *libuv.Write, status c.Int) { + var result error + if status != 0 { + result = libuvError(libuv.Errno(status)) + } + resolve(result) + }) + tcp := (*libuv.Stream)(&t.tcp) + buf, len := cbind.CBuffer(data) + bufs := &libuv.Buf{Base: buf, Len: uintptr(len)} + writer.Write(tcp, bufs, 1, cb) + }) +} + +func (t *Conn) Close() { + (*libuv.Handle)(unsafe.Pointer(&t.tcp)).Close(nil) +} diff --git a/x/tuple/tuple.go b/x/tuple/tuple.go new file mode 100644 index 00000000..1fbea192 --- /dev/null +++ b/x/tuple/tuple.go @@ -0,0 +1,40 @@ +package tuple + +type Tuple[T any] struct { + v T +} + +func T[T any](v T) Tuple[T] { + return Tuple[T]{v: v} +} + +func (t Tuple[T]) Get() T { + return t.v +} + +type Tuple2[T1 any, T2 any] struct { + v1 T1 + v2 T2 +} + +func T2[T1 any, T2 any](v1 T1, v2 T2) Tuple2[T1, T2] { + return Tuple2[T1, T2]{v1: v1, v2: v2} +} + +func (t Tuple2[T1, T2]) Get() (T1, T2) { + return t.v1, t.v2 +} + +type Tuple3[T1 any, T2 any, T3 any] struct { + v1 T1 + v2 T2 + v3 T3 +} + +func T3[T1 any, T2 any, T3 any](v1 T1, v2 T2, v3 T3) Tuple3[T1, T2, T3] { + return Tuple3[T1, T2, T3]{v1: v1, v2: v2, v3: v3} +} + +func (t Tuple3[T1, T2, T3]) Get() (T1, T2, T3) { + return t.v1, t.v2, t.v3 +}