Merge pull request #778 from cpunion/future-io

Future I/O
This commit is contained in:
xushiwei
2024-09-08 17:37:33 +08:00
committed by GitHub
19 changed files with 1303 additions and 552 deletions

14
cl/_testrt/freevars/in.go Normal file
View File

@@ -0,0 +1,14 @@
package main
func main() {
func(resolve func(error)) {
func(err error) {
if err != nil {
resolve(err)
return
}
resolve(nil)
}(nil)
}(func(err error) {
})
}

118
cl/_testrt/freevars/out.ll Normal file
View File

@@ -0,0 +1,118 @@
; ModuleID = 'main'
source_filename = "main"
%"github.com/goplus/llgo/internal/runtime.iface" = type { ptr, ptr }
%"github.com/goplus/llgo/internal/runtime.eface" = type { ptr, ptr }
@"main.init$guard" = global i1 false, align 1
@__llgo_argc = global i32 0, align 4
@__llgo_argv = global ptr null, align 8
define void @main.init() {
_llgo_0:
%0 = load i1, ptr @"main.init$guard", align 1
br i1 %0, label %_llgo_2, label %_llgo_1
_llgo_1: ; preds = %_llgo_0
store i1 true, ptr @"main.init$guard", align 1
br label %_llgo_2
_llgo_2: ; preds = %_llgo_1, %_llgo_0
ret void
}
define i32 @main(i32 %0, ptr %1) {
_llgo_0:
store i32 %0, ptr @__llgo_argc, align 4
store ptr %1, ptr @__llgo_argv, align 8
call void @"github.com/goplus/llgo/internal/runtime.init"()
call void @main.init()
%2 = alloca { ptr, ptr }, align 8
%3 = getelementptr inbounds { ptr, ptr }, ptr %2, i32 0, i32 0
store ptr @"__llgo_stub.main.main$2", ptr %3, align 8
%4 = getelementptr inbounds { ptr, ptr }, ptr %2, i32 0, i32 1
store ptr null, ptr %4, align 8
%5 = load { ptr, ptr }, ptr %2, align 8
call void @"main.main$1"({ ptr, ptr } %5)
ret i32 0
}
define void @"main.main$1"({ ptr, ptr } %0) {
_llgo_0:
%1 = call ptr @"github.com/goplus/llgo/internal/runtime.AllocZ"(i64 16)
store { ptr, ptr } %0, ptr %1, align 8
%2 = call ptr @"github.com/goplus/llgo/internal/runtime.AllocU"(i64 8)
%3 = getelementptr inbounds { ptr }, ptr %2, i32 0, i32 0
store ptr %1, ptr %3, align 8
%4 = alloca { ptr, ptr }, align 8
%5 = getelementptr inbounds { ptr, ptr }, ptr %4, i32 0, i32 0
store ptr @"main.main$1$1", ptr %5, align 8
%6 = getelementptr inbounds { ptr, ptr }, ptr %4, i32 0, i32 1
store ptr %2, ptr %6, align 8
%7 = load { ptr, ptr }, ptr %4, align 8
%8 = extractvalue { ptr, ptr } %7, 1
%9 = extractvalue { ptr, ptr } %7, 0
call void %9(ptr %8, %"github.com/goplus/llgo/internal/runtime.iface" zeroinitializer)
ret void
}
define void @"main.main$1$1"(ptr %0, %"github.com/goplus/llgo/internal/runtime.iface" %1) {
_llgo_0:
%2 = call ptr @"github.com/goplus/llgo/internal/runtime.IfaceType"(%"github.com/goplus/llgo/internal/runtime.iface" %1)
%3 = extractvalue %"github.com/goplus/llgo/internal/runtime.iface" %1, 1
%4 = alloca %"github.com/goplus/llgo/internal/runtime.eface", align 8
%5 = getelementptr inbounds %"github.com/goplus/llgo/internal/runtime.eface", ptr %4, i32 0, i32 0
store ptr %2, ptr %5, align 8
%6 = getelementptr inbounds %"github.com/goplus/llgo/internal/runtime.eface", ptr %4, i32 0, i32 1
store ptr %3, ptr %6, align 8
%7 = load %"github.com/goplus/llgo/internal/runtime.eface", ptr %4, align 8
%8 = call ptr @"github.com/goplus/llgo/internal/runtime.IfaceType"(%"github.com/goplus/llgo/internal/runtime.iface" zeroinitializer)
%9 = alloca %"github.com/goplus/llgo/internal/runtime.eface", align 8
%10 = getelementptr inbounds %"github.com/goplus/llgo/internal/runtime.eface", ptr %9, i32 0, i32 0
store ptr %8, ptr %10, align 8
%11 = getelementptr inbounds %"github.com/goplus/llgo/internal/runtime.eface", ptr %9, i32 0, i32 1
store ptr null, ptr %11, align 8
%12 = load %"github.com/goplus/llgo/internal/runtime.eface", ptr %9, align 8
%13 = call i1 @"github.com/goplus/llgo/internal/runtime.EfaceEqual"(%"github.com/goplus/llgo/internal/runtime.eface" %7, %"github.com/goplus/llgo/internal/runtime.eface" %12)
%14 = xor i1 %13, true
br i1 %14, label %_llgo_1, label %_llgo_2
_llgo_1: ; preds = %_llgo_0
%15 = load { ptr }, ptr %0, align 8
%16 = extractvalue { ptr } %15, 0
%17 = load { ptr, ptr }, ptr %16, align 8
%18 = extractvalue { ptr, ptr } %17, 1
%19 = extractvalue { ptr, ptr } %17, 0
call void %19(ptr %18, %"github.com/goplus/llgo/internal/runtime.iface" %1)
ret void
_llgo_2: ; preds = %_llgo_0
%20 = load { ptr }, ptr %0, align 8
%21 = extractvalue { ptr } %20, 0
%22 = load { ptr, ptr }, ptr %21, align 8
%23 = extractvalue { ptr, ptr } %22, 1
%24 = extractvalue { ptr, ptr } %22, 0
call void %24(ptr %23, %"github.com/goplus/llgo/internal/runtime.iface" zeroinitializer)
ret void
}
define void @"main.main$2"(%"github.com/goplus/llgo/internal/runtime.iface" %0) {
_llgo_0:
ret void
}
declare void @"github.com/goplus/llgo/internal/runtime.init"()
define linkonce void @"__llgo_stub.main.main$2"(ptr %0, %"github.com/goplus/llgo/internal/runtime.iface" %1) {
_llgo_0:
tail call void @"main.main$2"(%"github.com/goplus/llgo/internal/runtime.iface" %1)
ret void
}
declare ptr @"github.com/goplus/llgo/internal/runtime.AllocZ"(i64)
declare ptr @"github.com/goplus/llgo/internal/runtime.AllocU"(i64)
declare i1 @"github.com/goplus/llgo/internal/runtime.EfaceEqual"(%"github.com/goplus/llgo/internal/runtime.eface", %"github.com/goplus/llgo/internal/runtime.eface")
declare ptr @"github.com/goplus/llgo/internal/runtime.IfaceType"(%"github.com/goplus/llgo/internal/runtime.iface")

View File

@@ -169,10 +169,11 @@ type aFunction struct {
defer_ *aDefer
recov BasicBlock
params []Type
freeVars Expr
base int // base = 1 if hasFreeVars; base = 0 otherwise
hasVArg bool
params []Type
freeVars Expr
freeVarsBlock int
base int // base = 1 if hasFreeVars; base = 0 otherwise
hasVArg bool
}
// Function represents a function or method.
@@ -249,12 +250,13 @@ func (p Function) Param(i int) Expr {
}
func (p Function) closureCtx(b Builder) Expr {
if p.freeVars.IsNil() {
if p.freeVars.IsNil() || (p.freeVarsBlock != 0 && p.freeVarsBlock != b.blk.Index()) {
if p.base == 0 {
panic("ssa: function has no free variables")
}
ptr := Expr{p.impl.Param(0), p.params[0]}
p.freeVars = b.Load(ptr)
p.freeVarsBlock = b.blk.Index()
}
return p.freeVars
}

244
x/async/_demo/all/all.go Normal file
View File

@@ -0,0 +1,244 @@
package main
import (
"fmt"
"os"
"time"
"github.com/goplus/llgo/x/async"
"github.com/goplus/llgo/x/async/timeout"
"github.com/goplus/llgo/x/socketio"
"github.com/goplus/llgo/x/tuple"
)
func ReadFile(fileName string) async.Future[tuple.Tuple2[[]byte, error]] {
return async.Async(func(resolve func(tuple.Tuple2[[]byte, error])) {
go func() {
println("read file", fileName)
bytes, err := os.ReadFile(fileName)
resolve(tuple.T2(bytes, err))
}()
})
}
func WriteFile(fileName string, content []byte) async.Future[error] {
return async.Async(func(resolve func(error)) {
go func() {
err := os.WriteFile(fileName, content, 0644)
resolve(err)
}()
})
}
func sleep(i int, d time.Duration) async.Future[int] {
return async.Async(func(resolve func(int)) {
timeout.Timeout(d)(func(async.Void) {
resolve(i)
})
})
}
func main() {
RunIO()
RunAllAndRace()
RunTimeout()
RunSocket()
}
func RunIO() {
println("RunIO with Await")
// Hide `resolve` in Go+
async.Run(async.Async(func(resolve func(async.Void)) {
println("read file")
defer resolve(async.Void{})
content, err := async.Await(ReadFile("all.go")).Get()
if err != nil {
fmt.Printf("read err: %v\n", err)
return
}
fmt.Printf("read content: %s\n", content)
err = async.Await(WriteFile("2.out", content))
if err != nil {
fmt.Printf("write err: %v\n", err)
return
}
fmt.Printf("write done\n")
}))
// Translated Await to BindIO in Go+:
println("RunIO with BindIO")
async.Run(async.Async(func(resolve func(async.Void)) {
ReadFile("all.go")(func(v tuple.Tuple2[[]byte, error]) {
content, err := v.Get()
if err != nil {
fmt.Printf("read err: %v\n", err)
resolve(async.Void{})
return
}
fmt.Printf("read content: %s\n", content)
WriteFile("2.out", content)(func(v error) {
err = v
if err != nil {
fmt.Printf("write err: %v\n", err)
resolve(async.Void{})
return
}
println("write done")
resolve(async.Void{})
})
})
}))
}
func RunAllAndRace() {
ms100 := 100 * time.Millisecond
ms200 := 200 * time.Millisecond
ms300 := 300 * time.Millisecond
println("Run All with Await")
async.Run(async.Async(func(resolve func(async.Void)) {
async.All(sleep(1, ms200), sleep(2, ms100), sleep(3, ms300))(func(v []int) {
fmt.Printf("All: %v\n", v)
resolve(async.Void{})
})
}))
println("Run Race with Await")
async.Run(async.Async(func(resolve func(async.Void)) {
first := async.Race(sleep(1, ms200), sleep(2, ms100), sleep(3, ms300))
v := async.Await(first)
fmt.Printf("Race: %v\n", v)
resolve(async.Void{})
}))
// Translated to in Go+:
println("Run All with BindIO")
async.Run(async.Async(func(resolve func(async.Void)) {
async.All(sleep(1, ms200), sleep(2, ms100), sleep(3, ms300))(func(v []int) {
fmt.Printf("All: %v\n", v)
resolve(async.Void{})
})
}))
println("Run Race with BindIO")
async.Run(async.Async(func(resolve func(async.Void)) {
async.Race(sleep(1, ms200), sleep(2, ms100), sleep(3, ms300))(func(v int) {
fmt.Printf("Race: %v\n", v)
resolve(async.Void{})
})
}))
}
func RunTimeout() {
println("Run Timeout with Await")
async.Run(async.Async(func(resolve func(async.Void)) {
fmt.Printf("Start 100 ms timeout\n")
async.Await(timeout.Timeout(100 * time.Millisecond))
fmt.Printf("timeout\n")
resolve(async.Void{})
}))
// Translated to in Go+:
println("Run Timeout with BindIO")
async.Run(async.Async(func(resolve func(async.Void)) {
fmt.Printf("Start 100 ms timeout\n")
timeout.Timeout(100 * time.Millisecond)(func(async.Void) {
fmt.Printf("timeout\n")
resolve(async.Void{})
})
}))
}
func RunSocket() {
println("Run Socket")
async.Run(async.Async(func(resolve func(async.Void)) {
println("RunServer")
RunServer()(func(async.Void) {
println("RunServer done")
resolve(async.Void{})
})
println("RunClient")
timeout.Timeout(100 * time.Millisecond)(func(async.Void) {
RunClient()(func(async.Void) {
println("RunClient done")
resolve(async.Void{})
})
})
}))
}
func RunClient() async.Future[async.Void] {
return async.Async(func(resolve func(async.Void)) {
addr := "127.0.0.1:3927"
socketio.Connect("tcp", addr)(func(v tuple.Tuple2[*socketio.Conn, error]) {
client, err := v.Get()
println("Connected", client, err)
if err != nil {
panic(err)
}
counter := 0
var loop func(client *socketio.Conn)
loop = func(client *socketio.Conn) {
counter++
data := fmt.Sprintf("Hello %d", counter)
client.Write([]byte(data))(func(err error) {
if err != nil {
panic(err)
}
client.Read()(func(v tuple.Tuple2[[]byte, error]) {
data, err := v.Get()
if err != nil {
panic(err)
}
println("Read from server:", string(data))
timeout.Timeout(1 * time.Second)(func(async.Void) {
loop(client)
})
})
})
}
loop(client)
})
})
}
func RunServer() async.Future[async.Void] {
return async.Async(func(resolve func(async.Void)) {
socketio.Listen("tcp", "0.0.0.0:3927", func(client *socketio.Conn, err error) {
println("Client connected", client, err)
var loop func(client *socketio.Conn)
loop = func(client *socketio.Conn) {
client.Read()(func(v tuple.Tuple2[[]byte, error]) {
data, err := v.Get()
if err != nil {
println("Read error", err)
} else {
println("Read from client:", string(data))
client.Write(data)(func(err error) {
if err != nil {
println("Write error", err)
} else {
loop(client)
}
})
}
})
}
loop(client)
})
})
}

30
x/async/async.go Normal file
View File

@@ -0,0 +1,30 @@
/*
* Copyright (c) 2024 The GoPlus Authors (goplus.org). All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package async
import (
_ "unsafe"
)
type Void = [0]byte
type Future[T any] func(func(T))
// Just for pure LLGo/Go, transpile to callback in Go+
func Await[T1 any](future Future[T1]) T1 {
return Run(future)
}

67
x/async/async_go.go Normal file
View File

@@ -0,0 +1,67 @@
//go:build !llgo
// +build !llgo
/*
* Copyright (c) 2024 The GoPlus Authors (goplus.org). All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package async
import "sync"
func Async[T any](fn func(func(T))) Future[T] {
return func(chain func(T)) {
go fn(chain)
}
}
// -----------------------------------------------------------------------------
func Race[T1 any](futures ...Future[T1]) Future[T1] {
return Async(func(resolve func(T1)) {
ch := make(chan T1)
for _, future := range futures {
future := future
future(func(v T1) {
defer func() {
// Avoid panic when the channel is closed.
_ = recover()
}()
ch <- v
})
}
v := <-ch
close(ch)
resolve(v)
})
}
func All[T1 any](futures ...Future[T1]) Future[[]T1] {
return Async(func(resolve func([]T1)) {
n := len(futures)
results := make([]T1, n)
wg := sync.WaitGroup{}
wg.Add(n)
for i, future := range futures {
i := i
future(func(v T1) {
results[i] = v
wg.Done()
})
}
wg.Wait()
resolve(results)
})
}

82
x/async/async_llgo.go Normal file
View File

@@ -0,0 +1,82 @@
//go:build llgo
// +build llgo
/*
* Copyright (c) 2024 The GoPlus Authors (goplus.org). All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package async
import (
"sync/atomic"
"github.com/goplus/llgo/c/libuv"
"github.com/goplus/llgo/x/cbind"
)
// Currently Async run chain a future that call chain in the goroutine running `async.Run`.
// TODO(lijie): It would better to switch when needed.
func Async[T any](fn func(func(T))) Future[T] {
return func(chain func(T)) {
loop := Exec().L
var result T
var a *libuv.Async
var cb libuv.AsyncCb
a, cb = cbind.BindF[libuv.Async, libuv.AsyncCb](func(a *libuv.Async) {
a.Close(nil)
chain(result)
})
loop.Async(a, cb)
fn(func(v T) {
result = v
a.Send()
})
}
}
// -----------------------------------------------------------------------------
func Race[T1 any](futures ...Future[T1]) Future[T1] {
return Async(func(resolve func(T1)) {
done := atomic.Bool{}
for _, future := range futures {
future(func(v T1) {
if !done.Swap(true) {
// Just resolve the first one.
resolve(v)
}
})
}
})
}
func All[T1 any](futures ...Future[T1]) Future[[]T1] {
return Async(func(resolve func([]T1)) {
n := len(futures)
results := make([]T1, n)
var done uint32
for i, future := range futures {
i := i
future(func(v T1) {
results[i] = v
if atomic.AddUint32(&done, 1) == uint32(n) {
// All done.
resolve(results)
}
})
}
})
}

30
x/async/executor_go.go Normal file
View File

@@ -0,0 +1,30 @@
//go:build !llgo
// +build !llgo
/*
* Copyright (c) 2024 The GoPlus Authors (goplus.org). All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package async
func Run[T any](future Future[T]) T {
ch := make(chan T)
go func() {
future(func(v T) {
ch <- v
})
}()
return <-ch
}

69
x/async/executor_llgo.go Normal file
View File

@@ -0,0 +1,69 @@
//go:build llgo
// +build llgo
/*
* Copyright (c) 2024 The GoPlus Authors (goplus.org). All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package async
import (
"unsafe"
"github.com/goplus/llgo/c/libuv"
"github.com/goplus/llgo/c/pthread"
)
var execKey pthread.Key
func init() {
execKey.Create(nil)
}
type Executor struct {
L *libuv.Loop
}
func Exec() *Executor {
v := execKey.Get()
if v == nil {
panic("async.Exec: no executor")
}
return (*Executor)(v)
}
func setExec(e *Executor) (old *Executor) {
old = (*Executor)(execKey.Get())
execKey.Set(unsafe.Pointer(e))
return
}
func (e *Executor) Run() {
e.L.Run(libuv.RUN_DEFAULT)
}
func Run[T any](future Future[T]) T {
loop := libuv.LoopNew()
exec := &Executor{loop}
oldExec := setExec(exec)
var ret T
future(func(v T) {
ret = v
})
exec.Run()
loop.Close()
setExec(oldExec)
return ret
}

View File

@@ -0,0 +1,35 @@
//go:build !llgo
// +build !llgo
/*
* Copyright (c) 2024 The GoPlus Authors (goplus.org). All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package timeout
import (
"time"
"github.com/goplus/llgo/x/async"
)
func Timeout(d time.Duration) async.Future[async.Void] {
return async.Async(func(resolve func(async.Void)) {
go func() {
time.Sleep(d)
resolve(async.Void{})
}()
})
}

View File

@@ -0,0 +1,44 @@
//go:build llgo
// +build llgo
/*
* Copyright (c) 2024 The GoPlus Authors (goplus.org). All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package timeout
import (
"time"
"github.com/goplus/llgo/c/libuv"
"github.com/goplus/llgo/x/async"
"github.com/goplus/llgo/x/cbind"
)
func Timeout(d time.Duration) async.Future[async.Void] {
return async.Async(func(resolve func(async.Void)) {
t, cb := cbind.BindF[libuv.Timer, libuv.TimerCb](func(t *libuv.Timer) {
resolve(async.Void{})
})
r := libuv.InitTimer(async.Exec().L, t)
if r != 0 {
panic("InitTimer failed")
}
r = t.Start(cb, uint64(d/time.Millisecond), 0)
if r != 0 {
panic("Start failed")
}
})
}

16
x/cbind/buf.go Normal file
View File

@@ -0,0 +1,16 @@
package cbind
import "unsafe"
type slice struct {
data unsafe.Pointer
len int
}
func GoBytes(buf *int8, n int) []byte {
return *(*[]byte)(unsafe.Pointer(&slice{unsafe.Pointer(buf), n}))
}
func CBuffer(data []byte) (*int8, int) {
return (*int8)(unsafe.Pointer(&data[0])), len(data)
}

143
x/cbind/cbind.go Normal file
View File

@@ -0,0 +1,143 @@
/*
* Copyright (c) 2024 The GoPlus Authors (goplus.org). All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cbind
import (
"unsafe"
)
// llgo:type C
type Cb[T any] func(*T)
// llgo:type C
type Cb1[T any, A any] func(*T, A)
// llgo:type C
type Cb2[T any, A any, B any] func(*T, A, B)
// llgo:type C
type Cb3[T any, A any, B any, C any] func(*T, A, B, C)
type bind[Base any] struct {
b Base
fn func(*Base)
}
type bind1[Base any, A any] struct {
b Base
fn func(*Base, A)
}
type bind2[Base any, A any, B any] struct {
b Base
fn func(*Base, A, B)
}
type bind3[Base any, A any, B any, C any] struct {
b Base
fn func(*Base, A, B, C)
}
func callback[Base any](base *Base) {
bind := (*bind[Base])(unsafe.Pointer(base))
bind.fn(base)
}
func callback1[Base any, A any](base *Base, a A) {
bind := (*bind1[Base, A])(unsafe.Pointer(base))
bind.fn(base, a)
}
func callback2[Base any, A any, B any](base *Base, a A, b B) {
bind := (*bind2[Base, A, B])(unsafe.Pointer(base))
bind.fn(base, a, b)
}
func callback3[Base any, A any, B any, C any](base *Base, a A, b B, c C) {
bind := (*bind3[Base, A, B, C])(unsafe.Pointer(base))
bind.fn(base, a, b, c)
}
/**
* Bind[N] binds a Go function to a C callback function.
*
* Example:
*
* timer, cb := cbind.Bind[libuv.Timer](func() {
* println("hello")
* })
* libuv.InitTimer(async.Exec().L, timer)
* timer.Start(cb, 1000, 0)
*
* @param call The Go function to bind.
* @return The data pointer and the C callback function.
*/
func Bind[T any](call func(*T)) (p *T, cb Cb[T]) {
bb := &bind[T]{fn: call}
p = (*T)(unsafe.Pointer(bb))
cb = callback[T]
return
}
func BindF[T any, F ~func(*T)](call func(*T)) (*T, F) {
bb := &bind[T]{fn: call}
p := (*T)(unsafe.Pointer(bb))
var fn F = callback[T]
return p, fn
}
func Bind1[T any, A any](call func(*T, A)) (p *T, cb Cb1[T, A]) {
bb := &bind1[T, A]{fn: call}
p = (*T)(unsafe.Pointer(bb))
cb = callback1[T, A]
return
}
func Bind1F[T any, F ~func(*T, A), A any](call func(*T, A)) (*T, F) {
bb := &bind1[T, A]{fn: call}
p := (*T)(unsafe.Pointer(bb))
var fn F = callback1[T, A]
return p, fn
}
func Bind2[T any, A any, B any](call func(*T, A, B)) (p *T, cb Cb2[T, A, B]) {
bb := &bind2[T, A, B]{fn: call}
p = (*T)(unsafe.Pointer(bb))
cb = callback2[T, A, B]
return
}
func Bind2F[T any, F ~func(*T, A, B), A any, B any](call func(*T, A, B)) (*T, F) {
bb := &bind2[T, A, B]{fn: call}
p := (*T)(unsafe.Pointer(bb))
var fn F = callback2[T, A, B]
return p, fn
}
func Bind3[T any, A any, B any, C any](call func(*T, A, B, C), a A, b B, c C) (p *T, cb Cb3[T, A, B, C]) {
bb := &bind3[T, A, B, C]{fn: call}
p = (*T)(unsafe.Pointer(bb))
cb = callback3[T, A, B, C]
return
}
func Bind3F[T any, F ~func(*T, A, B, C), A any, B any, C any](call func(*T, A, B, C), a A, b B, c C) (*T, F) {
bb := &bind3[T, A, B, C]{fn: call}
p := (*T)(unsafe.Pointer(bb))
var fn F = callback3[T, A, B, C]
return p, fn
}

View File

@@ -1,290 +0,0 @@
package main
import (
"encoding/json"
"fmt"
"log"
"time"
"github.com/goplus/llgo/x/io"
)
// -----------------------------------------------------------------------------
type Response struct {
StatusCode int
mockBody string
}
func (r *Response) mock(body string) {
r.mockBody = body
}
func (r *Response) Text() (resolve io.Promise[string]) {
resolve(r.mockBody, nil)
return
}
func (r *Response) TextCompiled() *io.PromiseImpl[string] {
P := &io.PromiseImpl[string]{}
P.Func = func(resolve func(string, error)) {
for {
switch P.Prev = P.Next; P.Prev {
case 0:
resolve(r.mockBody, nil)
P.Next = -1
return
default:
panic("Promise already done")
}
}
}
return P
}
func HttpGet(url string, callback func(resp *Response, err error)) {
resp := &Response{StatusCode: 200}
callback(resp, nil)
}
func AsyncHttpGet(url string) (resolve io.Promise[*Response]) {
HttpGet(url, resolve)
return
}
func AsyncHttpGetCompiled(url string) *io.PromiseImpl[*Response] {
P := &io.PromiseImpl[*Response]{}
P.Func = func(resolve func(*Response, error)) {
for {
switch P.Prev = P.Next; P.Prev {
case 0:
HttpGet(url, resolve)
P.Next = -1
return
default:
panic("Promise already done")
}
}
}
return P
}
// -----------------------------------------------------------------------------
type User struct {
Name string
}
func GetUser(uid string) (resolve io.Promise[User]) {
resp, err := AsyncHttpGet("http://example.com/user/" + uid).Await()
if err != nil {
resolve(User{}, err)
return
}
if resp.StatusCode != 200 {
resolve(User{}, fmt.Errorf("http status code: %d", resp.StatusCode))
return
}
resp.mock(`{"name":"Alice"}`)
body, err := resp.Text().Await()
if err != nil {
resolve(User{}, err)
return
}
user := User{}
if err := json.Unmarshal([]byte(body), &user); err != nil {
resolve(User{}, err)
return
}
resolve(user, nil)
return
}
func GetUserCompiled(uid string) *io.PromiseImpl[User] {
var state1 *io.PromiseImpl[*Response]
var state2 *io.PromiseImpl[string]
P := &io.PromiseImpl[User]{}
P.Func = func(resolve func(User, error)) {
for {
switch P.Prev = P.Next; P.Prev {
case 0:
state1 = AsyncHttpGetCompiled("http://example.com/user/" + uid)
P.Next = 1
return
case 1:
state1.EnsureDone()
resp, err := state1.Value, state1.Err
if err != nil {
resolve(User{}, err)
return
}
if resp.StatusCode != 200 {
resolve(User{}, fmt.Errorf("http status code: %d", resp.StatusCode))
return
}
resp.mock(`{"name":"Alice"}`)
state2 = resp.TextCompiled()
P.Next = 2
return
case 2:
state2.EnsureDone()
body, err := state2.Value, state2.Err
if err != nil {
resolve(User{}, err)
return
}
user := User{}
if err := json.Unmarshal([]byte(body), &user); err != nil {
resolve(User{}, err)
return
}
resolve(user, nil)
P.Next = -1
return
default:
panic("Promise already done")
}
}
}
return P
}
func GetScore() *io.Promise[float64] {
panic("todo: GetScore")
}
func GetScoreCompiled() *io.PromiseImpl[float64] {
P := &io.PromiseImpl[float64]{}
P.Func = func(resolve func(float64, error)) {
for {
switch P.Prev = P.Next; P.Prev {
case 0:
panic("todo: GetScore")
default:
panic("Promise already done")
}
}
}
return P
}
func DoUpdate(op string) *io.Promise[io.Void] {
panic("todo: DoUpdate")
}
func DoUpdateCompiled(op string) *io.PromiseImpl[io.Void] {
P := &io.PromiseImpl[io.Void]{}
P.Func = func(resolve func(io.Void, error)) {
for {
switch P.Prev = P.Next; P.Prev {
case 0:
panic("todo: DoUpdate")
default:
panic("Promise already done")
}
}
}
return P
}
func Demo() (resolve io.Promise[io.Void]) {
user, err := GetUser("123").Await()
log.Println(user, err)
user, err = io.Race[User](GetUser("123"), GetUser("456"), GetUser("789")).Await()
log.Println(user, err)
users, err := io.All[User]([]io.AsyncCall[User]{GetUser("123"), GetUser("456"), GetUser("789")}).Await()
log.Println(users, err)
user, score, _, err := io.Await3[User, float64, io.Void](GetUser("123"), GetScore(), DoUpdate("update sth."))
log.Println(user, score, err)
// TODO(lijie): select from multiple promises without channel
select {
case user := <-GetUser("123").Chan():
log.Println("user:", user)
case score := <-GetScore().Chan():
log.Println("score:", score)
case <-io.Timeout(5 * time.Second).Chan():
log.Println("timeout")
}
return
}
func DemoCompiled() *io.PromiseImpl[io.Void] {
var state1 *io.PromiseImpl[User]
var state2 *io.PromiseImpl[User]
var state3 *io.PromiseImpl[[]User]
var state4 *io.PromiseImpl[io.Await3Result[User, float64, io.Void]]
P := &io.PromiseImpl[io.Void]{}
P.Func = func(resolve func(io.Void, error)) {
for {
switch P.Prev = P.Next; P.Prev {
case 0:
state1 = GetUserCompiled("123")
P.Next = 1
return
case 1:
state1.EnsureDone()
user, err := state1.Value, state1.Err
log.Printf("user: %v, err: %v\n", user, err)
state2 = io.Race[User](GetUserCompiled("123"), GetUserCompiled("456"), GetUserCompiled("789"))
P.Next = 2
return
case 2:
state2.EnsureDone()
user, err := state2.Value, state2.Err
log.Println(user, err)
state3 = io.All[User]([]io.AsyncCall[User]{GetUserCompiled("123"), GetUserCompiled("456"), GetUserCompiled("789")})
P.Next = 3
return
case 3:
state3.EnsureDone()
users, err := state3.Value, state3.Err
log.Println(users, err)
state4 = io.Await3Compiled[User, float64, io.Void](GetUserCompiled("123"), GetScoreCompiled(), DoUpdateCompiled("update sth."))
P.Next = 4
return
case 4:
state4.EnsureDone()
user, score, _, err := state4.Value.V1, state4.Value.V2, state4.Value.V3, state4.Value.Err
log.Println(user, score, err)
select {
case user := <-GetUserCompiled("123").Chan():
log.Println("user:", user)
case score := <-GetScoreCompiled().Chan():
log.Println("score:", score)
case <-io.TimeoutCompiled(5 * time.Second).Chan():
log.Println("timeout")
}
P.Next = -1
return
default:
panic("Promise already done")
}
}
}
return P
}
func main() {
log.SetFlags(log.Lshortfile | log.LstdFlags)
// io.Run(Demo())
io.Run(DemoCompiled())
}

View File

@@ -1,170 +0,0 @@
/*
* Copyright (c) 2024 The GoPlus Authors (goplus.org). All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io
import (
_ "unsafe"
"time"
)
const (
LLGoPackage = "decl"
)
type Void = [0]byte
// -----------------------------------------------------------------------------
type AsyncCall[OutT any] interface {
Await(timeout ...time.Duration) (ret OutT, err error)
Chan() <-chan OutT
EnsureDone()
}
// llgo:link AsyncCall.Await llgo.await
func Await[OutT any](call AsyncCall[OutT], timeout ...time.Duration) (ret OutT, err error) {
return
}
//go:linkname Timeout llgo.timeout
func Timeout(time.Duration) (ret AsyncCall[Void])
func TimeoutCompiled(d time.Duration) *PromiseImpl[Void] {
P := &PromiseImpl[Void]{}
P.Func = func(resolve func(Void, error)) {
go func() {
time.Sleep(d)
resolve(Void{}, nil)
}()
}
return P
}
// llgo:link Race llgo.race
func Race[OutT any](acs ...AsyncCall[OutT]) (ret *PromiseImpl[OutT]) {
return
}
func All[OutT any](acs []AsyncCall[OutT]) (ret *PromiseImpl[[]OutT]) {
return nil
}
// llgo:link Await2 llgo.await
func Await2[OutT1, OutT2 any](
ac1 AsyncCall[OutT1], ac2 AsyncCall[OutT2],
timeout ...time.Duration) (ret1 OutT1, ret2 OutT2, err error) {
return
}
type Await2Result[T1 any, T2 any] struct {
V1 T1
V2 T2
Err error
}
func Await2Compiled[OutT1, OutT2 any](
ac1 AsyncCall[OutT1], ac2 AsyncCall[OutT2],
timeout ...time.Duration) (ret *PromiseImpl[Await2Result[OutT1, OutT2]]) {
return
}
// llgo:link Await3 llgo.await
func Await3[OutT1, OutT2, OutT3 any](
ac1 AsyncCall[OutT1], ac2 AsyncCall[OutT2], ac3 AsyncCall[OutT3],
timeout ...time.Duration) (ret1 OutT1, ret2 OutT2, ret3 OutT3, err error) {
return
}
type Await3Result[T1 any, T2 any, T3 any] struct {
V1 T1
V2 T2
V3 T3
Err error
}
func Await3Compiled[OutT1, OutT2, OutT3 any](
ac1 AsyncCall[OutT1], ac2 AsyncCall[OutT2], ac3 AsyncCall[OutT3],
timeout ...time.Duration) (ret *PromiseImpl[Await3Result[OutT1, OutT2, OutT3]]) {
return
}
func Run(ac AsyncCall[Void]) {
p := ac.(*PromiseImpl[Void])
p.Resume()
<-ac.Chan()
}
// -----------------------------------------------------------------------------
type Promise[OutT any] func(OutT, error)
// llgo:link Promise.Await llgo.await
func (p Promise[OutT]) Await(timeout ...time.Duration) (ret OutT, err error) {
return
}
func (p Promise[OutT]) Chan() <-chan OutT {
return nil
}
func (p Promise[OutT]) EnsureDone() {
}
// -----------------------------------------------------------------------------
type PromiseImpl[TOut any] struct {
Func func(resolve func(TOut, error))
Value TOut
Err error
Prev int
Next int
c chan TOut
}
func (p *PromiseImpl[TOut]) Resume() {
p.Func(func(v TOut, err error) {
p.Value = v
p.Err = err
})
}
func (p *PromiseImpl[TOut]) EnsureDone() {
if p.Next == -1 {
panic("Promise already done")
}
}
func (p *PromiseImpl[TOut]) Chan() <-chan TOut {
if p.c == nil {
p.c = make(chan TOut, 1)
p.Func(func(v TOut, err error) {
p.Value = v
p.Err = err
p.c <- v
})
}
return p.c
}
func (p *PromiseImpl[TOut]) Await(timeout ...time.Duration) (ret TOut, err error) {
panic("should not called")
}
// -----------------------------------------------------------------------------

View File

@@ -367,98 +367,23 @@ In some situations, you may want to get the first result of multiple async opera
## Design
Introduce `Promise` type to represent an asynchronous operation and its resulting value. `Promise` can be resolved with a value with an error. `Promise` can be awaited to get the value and error.
`Promise` just a type indicating the asynchronous operation, it can't be created and assigned directly. It be replaced to `PromiseImpl` by the LLGo compiler.
Introduce `async.IO[T]` type to represent an asynchronous operation, `async.Future[T]` type to represent the result of an asynchronous operation. `async.IO[T]` can be `bind` to a function that accepts `T` as an argument to chain multiple asynchronous operations. `async.IO[T]` can be `await` to get the value of the asynchronous operation.
```go
// Some native async functions
func timeoutAsync(d time.Duration, cb func()) {
go func() {
time.Sleep(d)
cb()
}()
}
package async
// Wrap callback-based async function into Promise
func resolveAfter1Second() (resolve Promise[string]) {
timeoutAsync(1 * time.Second, func() {
resolve("Resolved after 1 second", nil)
})
}
type Future[T any] func() T
type IO[T any] func() Future[T]
// Compiled to:
func resolveAfter1Second() (resolve PromiseImpl[string]) {
promise := io.NewPromiseImpl[string](resolve func(value string, err error) {
resolve: func(value string, err error) {
for true {
switch (promise.prev = promise.next) {
case 0:
timeoutAsync(1 * time.Second, func() {
resolve("Resolved after 1 second", nil)
})
}
}
},
}
return promise
}
func asyncCall() (resolve Promise[string]) {
str, err := resolveAfter1Second().Await()
resolve("AsyncCall: " + str, err)
}
// Compiled to:
func asyncCall() (resolve PromiseImpl[string]) {
promise := io.NewPromiseImpl[string](resolve func(value string, err error) {
for true {
switch (promise.prev = promise.next) {
case 0:
resolveAfter1Second()
return
case 1:
str, err := promise.value, promise.err
resolve("AsyncCall: " + str, err)
return
}
func main() {
io := func() Future[string] {
return func() string {
return "Hello, World!"
}
})
return promise
}
}
// Directly return Promise
func asyncCall2() Promise[string] {
return resolveAfter1Second()
}
// Compiled to:
func asyncCall2() PromiseImpl[string] {
return resolveAfter1Second()
}
// Don't wait for Promise to complete
func asyncCall3() {
resolveAfter1Second().Then(func(result string) {
fmt.Println("AsyncCall3: " + result)
})
}
func asyncMain() {
fmt.Println("Starting AsyncCall")
result1 := asyncCall().Await()
fmt.Println(result1)
fmt.Println("Starting AsyncCall2")
result2 := asyncCall2().Await()
fmt.Println(result2)
fmt.Println("Starting AsyncCall3")
asyncCall3()
// Wait for AsyncCall3 to complete
time.Sleep(2 * time.Second)
fmt.Println("Main function completed")
future := io()
value := future()
println(value)
}
```

92
x/socketio/socketio_go.go Normal file
View File

@@ -0,0 +1,92 @@
//go:build !llgo
// +build !llgo
/*
* Copyright (c) 2024 The GoPlus Authors (goplus.org). All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package socketio
import (
"net"
"github.com/goplus/llgo/x/async"
"github.com/goplus/llgo/x/tuple"
)
type Conn struct {
conn net.Conn
}
func Listen(protocol, bindAddr string, listenCb func(client *Conn, err error)) {
go func() {
listener, err := net.Listen(protocol, bindAddr)
if err != nil {
listenCb(nil, err)
return
}
for {
conn, err := listener.Accept()
if err != nil {
listenCb(nil, err)
return
}
listenCb(&Conn{conn: conn}, nil)
}
}()
}
func Connect(network, addr string) async.Future[tuple.Tuple2[*Conn, error]] {
return async.Async(func(resolve func(tuple.Tuple2[*Conn, error])) {
go func() {
conn, err := net.Dial(network, addr)
if err != nil {
resolve(tuple.T2[*Conn, error](nil, err))
return
}
resolve(tuple.T2[*Conn, error](&Conn{conn: conn}, nil))
}()
})
}
// Read once from the TCP connection.
func (t *Conn) Read() async.Future[tuple.Tuple2[[]byte, error]] {
return async.Async(func(resolve func(tuple.Tuple2[[]byte, error])) {
go func() {
buf := make([]byte, 1024)
n, err := t.conn.Read(buf)
if err != nil {
resolve(tuple.T2[[]byte, error](nil, err))
return
}
resolve(tuple.T2[[]byte, error](buf[:n], nil))
}()
})
}
func (t *Conn) Write(data []byte) async.Future[error] {
return async.Async(func(resolve func(error)) {
go func() {
_, err := t.conn.Write(data)
resolve(err)
}()
})
}
func (t *Conn) Close() {
if t.conn != nil {
t.conn.Close()
}
}

260
x/socketio/socketio_llgo.go Normal file
View File

@@ -0,0 +1,260 @@
//go:build llgo
// +build llgo
/*
* Copyright (c) 2024 The GoPlus Authors (goplus.org). All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package socketio
import (
"strings"
"syscall"
"unsafe"
_ "unsafe"
"github.com/goplus/llgo/c"
"github.com/goplus/llgo/c/libuv"
"github.com/goplus/llgo/c/net"
"github.com/goplus/llgo/x/async"
"github.com/goplus/llgo/x/cbind"
"github.com/goplus/llgo/x/tuple"
)
type Listener struct {
tcp libuv.Tcp
listenCb func(server *Listener, err error)
}
type Conn struct {
tcp libuv.Tcp
readCb func([]byte, error)
}
type libuvError libuv.Errno
func (e libuvError) Error() string {
s := libuv.Strerror(libuv.Errno(e))
return c.GoString(s, c.Strlen(s))
}
type getAddrInfoBind struct {
libuv.GetAddrInfo
resolve func(tuple.Tuple2[*net.SockAddr, error])
}
func getAddrInfoCb(p *libuv.GetAddrInfo, status c.Int, addr *net.AddrInfo) {
bind := (*getAddrInfoBind)(unsafe.Pointer(p))
if status != 0 {
bind.resolve(tuple.T2[*net.SockAddr, error](nil, libuvError(status)))
return
}
bind.resolve(tuple.T2[*net.SockAddr, error](addr.Addr, nil))
}
func parseAddr(addr string) async.Future[tuple.Tuple2[*net.SockAddr, error]] {
return async.Async(func(resolve func(tuple.Tuple2[*net.SockAddr, error])) {
host := "127.0.0.1"
var port string
// split host and service by last colon
idx := strings.LastIndex(addr, ":")
if idx < 0 {
port = addr
} else {
host = addr[:idx]
port = addr[idx+1:]
}
hints := &net.AddrInfo{
Family: net.AF_INET,
SockType: net.SOCK_STREAM,
Protocol: syscall.IPPROTO_TCP,
Flags: 0,
}
req, cb := cbind.Bind2F[libuv.GetAddrInfo, libuv.GetaddrinfoCb](func(i *libuv.GetAddrInfo, status c.Int, addr *net.AddrInfo) {
if status != 0 {
resolve(tuple.T2[*net.SockAddr, error](nil, libuvError(status)))
return
}
resolve(tuple.T2[*net.SockAddr, error](addr.Addr, nil))
})
if res := libuv.Getaddrinfo(async.Exec().L, req, cb, c.AllocaCStr(host), c.AllocaCStr(port), hints); res != 0 {
resolve(tuple.T2[*net.SockAddr, error](nil, libuvError(res)))
return
}
})
}
func Listen(protocol, bindAddr string, listenCb func(client *Conn, err error)) {
tcp, err := newListener()
if err != nil {
listenCb(nil, err)
return
}
parseAddr(bindAddr)(func(v tuple.Tuple2[*net.SockAddr, error]) {
addr, err := v.Get()
if err != nil {
listenCb(nil, err)
return
}
if err := tcp.bind(addr, 0); err != nil {
listenCb(nil, err)
return
}
if err := tcp.listen(128, func(server *Listener, err error) {
client, err := server.accept()
listenCb(client, err)
}); err != nil {
listenCb(nil, err)
}
})
}
func newListener() (*Listener, error) {
t := &Listener{}
if res := libuv.InitTcp(async.Exec().L, &t.tcp); res != 0 {
return nil, libuvError(res)
}
return t, nil
}
func (t *Listener) bind(addr *net.SockAddr, flags uint) error {
if res := t.tcp.Bind(addr, c.Uint(flags)); res != 0 {
return libuvError(res)
}
return nil
}
func (l *Listener) listen(backlog int, cb func(server *Listener, err error)) error {
l.listenCb = cb
res := (*libuv.Stream)(&l.tcp).Listen(c.Int(backlog), func(s *libuv.Stream, status c.Int) {
server := (*Listener)(unsafe.Pointer(s))
if status != 0 {
server.listenCb(server, libuvError(libuv.Errno(status)))
} else {
server.listenCb(server, nil)
}
})
if res != 0 {
return libuvError(res)
}
return nil
}
func (l *Listener) accept() (client *Conn, err error) {
tcp := &Conn{}
if res := libuv.InitTcp(async.Exec().L, &tcp.tcp); res != 0 {
return nil, libuvError(res)
}
if res := (*libuv.Stream)(&l.tcp).Accept((*libuv.Stream)(&tcp.tcp)); res != 0 {
return nil, libuvError(res)
}
return tcp, nil
}
func Connect(network, addr string) async.Future[tuple.Tuple2[*Conn, error]] {
return async.Async(func(resolve func(tuple.Tuple2[*Conn, error])) {
parseAddr(addr)(func(v tuple.Tuple2[*net.SockAddr, error]) {
addr, err := v.Get()
if err != nil {
resolve(tuple.T2[*Conn, error]((*Conn)(nil), err))
return
}
tcp := &Conn{}
if res := libuv.InitTcp(async.Exec().L, &tcp.tcp); res != 0 {
resolve(tuple.T2[*Conn, error]((*Conn)(nil), libuvError(res)))
return
}
req, cb := cbind.Bind1F[libuv.Connect, libuv.ConnectCb](func(c *libuv.Connect, status c.Int) {
if status != 0 {
resolve(tuple.T2[*Conn, error]((*Conn)(nil), libuvError(libuv.Errno(status))))
} else {
resolve(tuple.T2[*Conn, error](tcp, nil))
}
})
if res := libuv.TcpConnect(req, &tcp.tcp, addr, cb); res != 0 {
resolve(tuple.T2[*Conn, error]((*Conn)(nil), libuvError(res)))
return
}
})
})
}
func allocBuffer(handle *libuv.Handle, suggestedSize uintptr, buf *libuv.Buf) {
buf.Base = (*c.Char)(c.Malloc(suggestedSize))
buf.Len = suggestedSize
}
func (t *Conn) StartRead(fn func(data []byte, err error)) {
t.readCb = func(data []byte, err error) {
fn(data, err)
}
tcp := (*libuv.Stream)(&t.tcp)
res := tcp.StartRead(allocBuffer, func(client *libuv.Stream, nread c.Long, buf *libuv.Buf) {
tcp := (*Conn)(unsafe.Pointer(client))
if nread > 0 {
tcp.readCb(cbind.GoBytes(buf.Base, int(nread)), nil)
} else if nread < 0 {
tcp.readCb(nil, libuvError(libuv.Errno(nread)))
} else {
tcp.readCb(nil, nil)
}
})
if res != 0 {
t.readCb(nil, libuvError(libuv.Errno(res)))
}
}
func (t *Conn) StopRead() error {
tcp := (*libuv.Stream)(&t.tcp)
if res := tcp.StopRead(); res != 0 {
return libuvError(libuv.Errno(res))
}
return nil
}
// Read once from the TCP connection.
func (t *Conn) Read() async.Future[tuple.Tuple2[[]byte, error]] {
return async.Async(func(resolve func(tuple.Tuple2[[]byte, error])) {
t.StartRead(func(data []byte, err error) {
if err := t.StopRead(); err != nil {
panic(err)
}
resolve(tuple.T2[[]byte, error](data, err))
})
})
}
func (t *Conn) Write(data []byte) async.Future[error] {
return async.Async(func(resolve func(error)) {
writer, cb := cbind.Bind1F[libuv.Write, libuv.WriteCb](func(req *libuv.Write, status c.Int) {
var result error
if status != 0 {
result = libuvError(libuv.Errno(status))
}
resolve(result)
})
tcp := (*libuv.Stream)(&t.tcp)
buf, len := cbind.CBuffer(data)
bufs := &libuv.Buf{Base: buf, Len: uintptr(len)}
writer.Write(tcp, bufs, 1, cb)
})
}
func (t *Conn) Close() {
(*libuv.Handle)(unsafe.Pointer(&t.tcp)).Close(nil)
}

40
x/tuple/tuple.go Normal file
View File

@@ -0,0 +1,40 @@
package tuple
type Tuple[T any] struct {
v T
}
func T[T any](v T) Tuple[T] {
return Tuple[T]{v: v}
}
func (t Tuple[T]) Get() T {
return t.v
}
type Tuple2[T1 any, T2 any] struct {
v1 T1
v2 T2
}
func T2[T1 any, T2 any](v1 T1, v2 T2) Tuple2[T1, T2] {
return Tuple2[T1, T2]{v1: v1, v2: v2}
}
func (t Tuple2[T1, T2]) Get() (T1, T2) {
return t.v1, t.v2
}
type Tuple3[T1 any, T2 any, T3 any] struct {
v1 T1
v2 T2
v3 T3
}
func T3[T1 any, T2 any, T3 any](v1 T1, v2 T2, v3 T3) Tuple3[T1, T2, T3] {
return Tuple3[T1, T2, T3]{v1: v1, v2: v2, v3: v3}
}
func (t Tuple3[T1, T2, T3]) Get() (T1, T2, T3) {
return t.v1, t.v2, t.v3
}