Compare commits

...

18 Commits

Author SHA1 Message Date
xushiwei
683129b6a5 Merge pull request #781 from cpunion/future-io
Future IO update
2024-09-12 12:48:25 +08:00
Li Jie
7f4022120e fix deadlock 2024-09-10 14:38:46 +08:00
Li Jie
3f9e86c37a x 2024-09-10 11:49:42 +08:00
Li Jie
12f460e376 async.Run as global context, async operations run immediately 2024-09-10 11:43:44 +08:00
Li Jie
44c4488fcc async doc update 2024-09-09 10:41:22 +08:00
Li Jie
44617b6554 future supports multi-await but run once 2024-09-09 09:34:29 +08:00
Li Jie
ccc7d056ba socketio example: two tcp clients 2024-09-08 20:29:24 +08:00
Li Jie
566d5ef96f add Future.Then 2024-09-08 20:27:05 +08:00
xushiwei
cf53f3a347 Merge pull request #778 from cpunion/future-io
Future I/O
2024-09-08 17:37:33 +08:00
Li Jie
d2538d08a7 code clean 2024-09-07 10:20:02 +08:00
visualfc
75fe9d61a3 cl: function fix freevars cache 2024-09-07 10:04:38 +08:00
Li Jie
fce0672282 make future IO working both on go and llgo 2024-09-07 10:04:34 +08:00
Li Jie
69a2a01bc7 cbind.Bind: expose *Base argument 2024-09-07 09:45:23 +08:00
Li Jie
a2d4e79c20 new future IO and demo 2024-09-07 09:45:05 +08:00
Li Jie
6e0a9b2b48 cbind.BindF 2024-09-07 09:43:48 +08:00
Li Jie
276f2070ee hide tuple fields, only expose tuple.TN(...) and tuple.Get() 2024-09-07 09:43:48 +08:00
Li Jie
1a158b5de3 async: work both go and llgo 2024-09-07 09:43:48 +08:00
Li Jie
d4a72bf661 async.Run/Await/Race/All 2024-09-07 09:43:48 +08:00
20 changed files with 1460 additions and 550 deletions

14
cl/_testrt/freevars/in.go Normal file
View File

@@ -0,0 +1,14 @@
package main
func main() {
func(resolve func(error)) {
func(err error) {
if err != nil {
resolve(err)
return
}
resolve(nil)
}(nil)
}(func(err error) {
})
}

118
cl/_testrt/freevars/out.ll Normal file
View File

@@ -0,0 +1,118 @@
; ModuleID = 'main'
source_filename = "main"
%"github.com/goplus/llgo/internal/runtime.iface" = type { ptr, ptr }
%"github.com/goplus/llgo/internal/runtime.eface" = type { ptr, ptr }
@"main.init$guard" = global i1 false, align 1
@__llgo_argc = global i32 0, align 4
@__llgo_argv = global ptr null, align 8
define void @main.init() {
_llgo_0:
%0 = load i1, ptr @"main.init$guard", align 1
br i1 %0, label %_llgo_2, label %_llgo_1
_llgo_1: ; preds = %_llgo_0
store i1 true, ptr @"main.init$guard", align 1
br label %_llgo_2
_llgo_2: ; preds = %_llgo_1, %_llgo_0
ret void
}
define i32 @main(i32 %0, ptr %1) {
_llgo_0:
store i32 %0, ptr @__llgo_argc, align 4
store ptr %1, ptr @__llgo_argv, align 8
call void @"github.com/goplus/llgo/internal/runtime.init"()
call void @main.init()
%2 = alloca { ptr, ptr }, align 8
%3 = getelementptr inbounds { ptr, ptr }, ptr %2, i32 0, i32 0
store ptr @"__llgo_stub.main.main$2", ptr %3, align 8
%4 = getelementptr inbounds { ptr, ptr }, ptr %2, i32 0, i32 1
store ptr null, ptr %4, align 8
%5 = load { ptr, ptr }, ptr %2, align 8
call void @"main.main$1"({ ptr, ptr } %5)
ret i32 0
}
define void @"main.main$1"({ ptr, ptr } %0) {
_llgo_0:
%1 = call ptr @"github.com/goplus/llgo/internal/runtime.AllocZ"(i64 16)
store { ptr, ptr } %0, ptr %1, align 8
%2 = call ptr @"github.com/goplus/llgo/internal/runtime.AllocU"(i64 8)
%3 = getelementptr inbounds { ptr }, ptr %2, i32 0, i32 0
store ptr %1, ptr %3, align 8
%4 = alloca { ptr, ptr }, align 8
%5 = getelementptr inbounds { ptr, ptr }, ptr %4, i32 0, i32 0
store ptr @"main.main$1$1", ptr %5, align 8
%6 = getelementptr inbounds { ptr, ptr }, ptr %4, i32 0, i32 1
store ptr %2, ptr %6, align 8
%7 = load { ptr, ptr }, ptr %4, align 8
%8 = extractvalue { ptr, ptr } %7, 1
%9 = extractvalue { ptr, ptr } %7, 0
call void %9(ptr %8, %"github.com/goplus/llgo/internal/runtime.iface" zeroinitializer)
ret void
}
define void @"main.main$1$1"(ptr %0, %"github.com/goplus/llgo/internal/runtime.iface" %1) {
_llgo_0:
%2 = call ptr @"github.com/goplus/llgo/internal/runtime.IfaceType"(%"github.com/goplus/llgo/internal/runtime.iface" %1)
%3 = extractvalue %"github.com/goplus/llgo/internal/runtime.iface" %1, 1
%4 = alloca %"github.com/goplus/llgo/internal/runtime.eface", align 8
%5 = getelementptr inbounds %"github.com/goplus/llgo/internal/runtime.eface", ptr %4, i32 0, i32 0
store ptr %2, ptr %5, align 8
%6 = getelementptr inbounds %"github.com/goplus/llgo/internal/runtime.eface", ptr %4, i32 0, i32 1
store ptr %3, ptr %6, align 8
%7 = load %"github.com/goplus/llgo/internal/runtime.eface", ptr %4, align 8
%8 = call ptr @"github.com/goplus/llgo/internal/runtime.IfaceType"(%"github.com/goplus/llgo/internal/runtime.iface" zeroinitializer)
%9 = alloca %"github.com/goplus/llgo/internal/runtime.eface", align 8
%10 = getelementptr inbounds %"github.com/goplus/llgo/internal/runtime.eface", ptr %9, i32 0, i32 0
store ptr %8, ptr %10, align 8
%11 = getelementptr inbounds %"github.com/goplus/llgo/internal/runtime.eface", ptr %9, i32 0, i32 1
store ptr null, ptr %11, align 8
%12 = load %"github.com/goplus/llgo/internal/runtime.eface", ptr %9, align 8
%13 = call i1 @"github.com/goplus/llgo/internal/runtime.EfaceEqual"(%"github.com/goplus/llgo/internal/runtime.eface" %7, %"github.com/goplus/llgo/internal/runtime.eface" %12)
%14 = xor i1 %13, true
br i1 %14, label %_llgo_1, label %_llgo_2
_llgo_1: ; preds = %_llgo_0
%15 = load { ptr }, ptr %0, align 8
%16 = extractvalue { ptr } %15, 0
%17 = load { ptr, ptr }, ptr %16, align 8
%18 = extractvalue { ptr, ptr } %17, 1
%19 = extractvalue { ptr, ptr } %17, 0
call void %19(ptr %18, %"github.com/goplus/llgo/internal/runtime.iface" %1)
ret void
_llgo_2: ; preds = %_llgo_0
%20 = load { ptr }, ptr %0, align 8
%21 = extractvalue { ptr } %20, 0
%22 = load { ptr, ptr }, ptr %21, align 8
%23 = extractvalue { ptr, ptr } %22, 1
%24 = extractvalue { ptr, ptr } %22, 0
call void %24(ptr %23, %"github.com/goplus/llgo/internal/runtime.iface" zeroinitializer)
ret void
}
define void @"main.main$2"(%"github.com/goplus/llgo/internal/runtime.iface" %0) {
_llgo_0:
ret void
}
declare void @"github.com/goplus/llgo/internal/runtime.init"()
define linkonce void @"__llgo_stub.main.main$2"(ptr %0, %"github.com/goplus/llgo/internal/runtime.iface" %1) {
_llgo_0:
tail call void @"main.main$2"(%"github.com/goplus/llgo/internal/runtime.iface" %1)
ret void
}
declare ptr @"github.com/goplus/llgo/internal/runtime.AllocZ"(i64)
declare ptr @"github.com/goplus/llgo/internal/runtime.AllocU"(i64)
declare i1 @"github.com/goplus/llgo/internal/runtime.EfaceEqual"(%"github.com/goplus/llgo/internal/runtime.eface", %"github.com/goplus/llgo/internal/runtime.eface")
declare ptr @"github.com/goplus/llgo/internal/runtime.IfaceType"(%"github.com/goplus/llgo/internal/runtime.iface")

View File

@@ -171,6 +171,7 @@ type aFunction struct {
params []Type
freeVars Expr
freeVarsBlock int
base int // base = 1 if hasFreeVars; base = 0 otherwise
hasVArg bool
}
@@ -249,12 +250,13 @@ func (p Function) Param(i int) Expr {
}
func (p Function) closureCtx(b Builder) Expr {
if p.freeVars.IsNil() {
if p.freeVars.IsNil() || (p.freeVarsBlock != 0 && p.freeVarsBlock != b.blk.Index()) {
if p.base == 0 {
panic("ssa: function has no free variables")
}
ptr := Expr{p.impl.Param(0), p.params[0]}
p.freeVars = b.Load(ptr)
p.freeVarsBlock = b.blk.Index()
}
return p.freeVars
}

View File

@@ -361,104 +361,83 @@ In some situations, you may want to get the first result of multiple async opera
## Design considerations in LLGo
- Don't introduce `async`/`await` keywords to compatible with Go compiler (just compiling)
- For performance reason don't implement async functions with goroutines
- Avoid implementing `Promise` by using `chan` to avoid blocking the thread, but it can be wrapped as a `chan` to make it compatible `select` statement
- Don't introduce `async`/`await` keywords to compatible with Go
- For performance and memory reasons don't implement async functions with goroutines, coroutines, or other mechanisms that require per-task stack allocation
- Avoid implementing async task by using `chan` that blocking the thread
## Design
Introduce `Promise` type to represent an asynchronous operation and its resulting value. `Promise` can be resolved with a value with an error. `Promise` can be awaited to get the value and error.
### `async.Future[T]` type
`Promise` just a type indicating the asynchronous operation, it can't be created and assigned directly. It be replaced to `PromiseImpl` by the LLGo compiler.
Introduce `async.Future[T]` type to represent an eventual completion (or failure) of an asynchronous operation and its resulting value, similar to `Promise`/`Future` in other languages. Functions that return `async.Future[T]` are considered asynchronous functions.
### Future creation
`async.Future[T]` can be created by `async.Async[T]` function that takes a function that accepts a `resolve` function to produce a value of type `T`.
### Future chaining (asynchronous callbacks style)
`async.Future[T]` can be chained with `Then` method to add multiple callbacks to be executed when the operation is completed, it just runs once and calls every callbacks. Currently `Then` method can't be chained multiple times because Go doesn't support generics method (Need support `func (f Future[T]) Then[U any](f func(T) Future[U]) Future[U]`), maybe implements in Go+.
### Future waiting (synchronous style)
`async.Await[T]` function can be used to wait for the completion of a `Future[T]` and return the value produced by the operation. In LLGo, `async.Await[T]` is a blocking function that waits for the completion of the `Future[T]` and returns the value synchronously, it would be transformed to `Future.Then` callback in the frontend.
### `async.Run[T]` function
`async.Run[T]` function can be used to create an global asynchronous context and run async functions, and it would be hidden by the compiler in the future.
Currently it will switch the callbacks to the goroutine that calls `async.Run[T]` function, this maybe changed in the future to reduce the overhead of switching goroutines and make it more parallel.
### Prototype
```go
// Some native async functions
func timeoutAsync(d time.Duration, cb func()) {
go func() {
time.Sleep(d)
cb()
}()
package async
type Future[T any] interface {
Then(f func(T))
}
// Wrap callback-based async function into Promise
func resolveAfter1Second() (resolve Promise[string]) {
timeoutAsync(1 * time.Second, func() {
resolve("Resolved after 1 second", nil)
})
}
func Async[T any](f func(resolve func(T))) Future[T]
// Compiled to:
func resolveAfter1Second() (resolve PromiseImpl[string]) {
promise := io.NewPromiseImpl[string](resolve func(value string, err error) {
resolve: func(value string, err error) {
for true {
switch (promise.prev = promise.next) {
case 0:
timeoutAsync(1 * time.Second, func() {
resolve("Resolved after 1 second", nil)
func Await[T any](future Future[T]) T
```
### Some async functions
```go
package async
func Race[T1 any](futures ...Future[T1]) Future[T1]
func All[T1 any](futures ...Future[T1]) Future[[]T1]
```
### Example
```go
package main
func main() {
async.Run(func() {
hello := func() async.Future[string] {
return async.Async(func(resolve func(string)) {
resolve("Hello, World!")
})
}
}
},
}
return promise
}
func asyncCall() (resolve Promise[string]) {
str, err := resolveAfter1Second().Await()
resolve("AsyncCall: " + str, err)
}
// Compiled to:
func asyncCall() (resolve PromiseImpl[string]) {
promise := io.NewPromiseImpl[string](resolve func(value string, err error) {
for true {
switch (promise.prev = promise.next) {
case 0:
resolveAfter1Second()
return
case 1:
str, err := promise.value, promise.err
resolve("AsyncCall: " + str, err)
return
}
}
future := hello()
future.Then(func(value string) {
println("first callback:", value)
})
return promise
}
// Directly return Promise
func asyncCall2() Promise[string] {
return resolveAfter1Second()
}
// Compiled to:
func asyncCall2() PromiseImpl[string] {
return resolveAfter1Second()
}
// Don't wait for Promise to complete
func asyncCall3() {
resolveAfter1Second().Then(func(result string) {
fmt.Println("AsyncCall3: " + result)
future.Then(func(value string) {
println("second callback:", value)
})
}
func asyncMain() {
fmt.Println("Starting AsyncCall")
result1 := asyncCall().Await()
fmt.Println(result1)
fmt.Println("Starting AsyncCall2")
result2 := asyncCall2().Await()
fmt.Println(result2)
fmt.Println("Starting AsyncCall3")
asyncCall3()
// Wait for AsyncCall3 to complete
time.Sleep(2 * time.Second)
fmt.Println("Main function completed")
println("first await:", async.Await(future))
println("second await:", async.Await(future))
})
}
```

17
x/async/TODO.md Normal file
View File

@@ -0,0 +1,17 @@
讨论:
1. Future 用 interface 还是闭包:性能应该差不多,如果没有其他方法要暴露,感觉也没有换成 interface 的必要interface 多一个对象分配。先添加 Then 方法方便未来替换。
2. 几个方法提供不同参数个数的版本还是用 tuple如果编译器不支持可变泛型参数个数和特化我倾向用 tuple 先简化实现tuple 的开销应该也容易被编译器优化掉。多个方法让用户选择 Await2/Await3 这种也恶心。
3. 是否 Cancellable暂时不加进去多一个 context也不一定能快速稳定下来可以后面根据实践再改。
4. Executor 可能会变化,目前提供的 Run 是阻塞的,也可以把它做成异步。
5. 尽量再隐藏一些辅助类型,比如 TupleN可能之提供 tuple 的构造和返回多值。内部的 libuv 如果隐藏可能要暴露同等接口,先不动了
6. 性能可能做个简单测试,但不是关键,只要别太差。未来可能会尽量减少 executor 的切换、尽量多并行
7. 异常兼容性:目前没考虑,这个要在回调里处理可能困难,要么就在 await 上处理,可以往后放一下,毕竟 golang 主要是以 error 为主
8. 可能先看一下如何在 go+里面集成,判断目前的设计实现是否合理
9. 多封装一些库看看通用性和易用性,\_demo 里几个简单例子基本符合预期,还需要更多检验
TODO
[ ] 1. select 兼容 (可能把 Future 改为 interface 更合理?)
[x] 2. Future 多个 Await 只会被执行一次
[x] 3. Future 添加 Then 方法,不推荐直接当作函数调用,方便未来切换

289
x/async/_demo/all/all.go Normal file
View File

@@ -0,0 +1,289 @@
package main
import (
"fmt"
"os"
"sync/atomic"
"time"
"github.com/goplus/llgo/x/async"
"github.com/goplus/llgo/x/async/timeout"
"github.com/goplus/llgo/x/socketio"
"github.com/goplus/llgo/x/tuple"
)
func ReadFile(fileName string) async.Future[tuple.Tuple2[[]byte, error]] {
return async.Async(func(resolve func(tuple.Tuple2[[]byte, error])) {
go func() {
println("read file", fileName)
bytes, err := os.ReadFile(fileName)
resolve(tuple.T2(bytes, err))
}()
})
}
func WriteFile(fileName string, content []byte) async.Future[error] {
return async.Async(func(resolve func(error)) {
go func() {
err := os.WriteFile(fileName, content, 0644)
resolve(err)
}()
})
}
func sleep(i int, d time.Duration) async.Future[int] {
return async.Async(func(resolve func(int)) {
timeout.Timeout(d).Then(func(async.Void) {
resolve(i)
})
})
}
func main() {
async.Run(func(resolve func(async.Void)) {
RunIO()
RunAllAndRace()
RunTimeout()
RunMultipleCallbacksNodelay()
RunMultipleCallbacksDelay()
RunSocket()
})
}
func RunIO() {
println("RunIO with Await")
// Hide `resolve` in Go+
println("read file")
content, err := async.Await(ReadFile("all.go")).Get()
if err != nil {
fmt.Printf("read err: %v\n", err)
return
}
fmt.Printf("read content: %s\n", content)
err = async.Await(WriteFile("2.out", content))
if err != nil {
fmt.Printf("write err: %v\n", err)
return
}
fmt.Printf("write done\n")
// Translated Await to BindIO in Go+:
println("RunIO with BindIO")
ReadFile("all.go").Then(func(v tuple.Tuple2[[]byte, error]) {
content, err := v.Get()
if err != nil {
fmt.Printf("read err: %v\n", err)
return
}
fmt.Printf("read content: %s\n", content)
WriteFile("2.out", content).Then(func(v error) {
err = v
if err != nil {
fmt.Printf("write err: %v\n", err)
return
}
println("write done")
})
})
}
func RunAllAndRace() {
ms100 := 100 * time.Millisecond
ms200 := 200 * time.Millisecond
ms300 := 300 * time.Millisecond
println("Run All with Await")
async.All(sleep(1, ms200), sleep(2, ms100), sleep(3, ms300)).Then(func(v []int) {
fmt.Printf("All: %v\n", v)
})
println("Run Race with Await")
first := async.Race(sleep(1, ms200), sleep(2, ms100), sleep(3, ms300))
v := async.Await(first)
fmt.Printf("Race: %v\n", v)
// Translated to in Go+:
println("Run All with BindIO")
async.All(sleep(1, ms200), sleep(2, ms100), sleep(3, ms300)).Then(func(v []int) {
fmt.Printf("All: %v\n", v)
})
println("Run Race with BindIO")
async.Race(sleep(1, ms200), sleep(2, ms100), sleep(3, ms300)).Then(func(v int) {
fmt.Printf("Race: %v\n", v)
})
}
func RunTimeout() {
println("Run Timeout with Await")
fmt.Printf("Start 100 ms timeout\n")
async.Await(timeout.Timeout(100 * time.Millisecond))
fmt.Printf("timeout\n")
// Translated to in Go+:
println("Run Timeout with BindIO")
fmt.Printf("Start 100 ms timeout\n")
timeout.Timeout(100 * time.Millisecond).Then(func(async.Void) {
fmt.Printf("timeout\n")
})
}
func RunMultipleCallbacksNodelay() {
println("Run Multiple Callbacks")
runCnt := atomic.Int32{}
nodelay := async.Async(func(resolve func(async.Void)) {
println("nodelay")
runCnt.Add(1)
})
cbCnt := atomic.Int32{}
cb := func() {
if cbCnt.Add(1) == 2 {
if runCnt.Load() != 1 {
panic("runCnt != 1, got: " + fmt.Sprint(runCnt.Load()))
} else {
println("runCnt == 1")
}
}
}
nodelay.Then(func(async.Void) {
println("nodelay done")
cb()
})
nodelay.Then(func(async.Void) {
println("nodelay done again")
cb()
})
}
func RunMultipleCallbacksDelay() {
println("Run Multiple Callbacks")
runCnt := atomic.Int32{}
delay := async.Async(func(resolve func(async.Void)) {
timeout.Timeout(100 * time.Millisecond).Then(func(async.Void) {
println("delay")
runCnt.Add(1)
})
})
cbCnt := atomic.Int32{}
cb := func() {
if cbCnt.Add(1) == 2 {
if runCnt.Load() != 1 {
panic("runCnt != 1, got: " + fmt.Sprint(runCnt.Load()))
} else {
println("runCnt == 1")
}
}
}
delay.Then(func(async.Void) {
println("delay done")
cb()
})
delay.Then(func(async.Void) {
println("delay done again")
cb()
})
}
func RunSocket() {
println("Run Socket")
println("RunServer")
RunServer().Then(func(async.Void) {
println("RunServer done")
})
println("RunClient")
timeout.Timeout(100 * time.Millisecond).Then(func(async.Void) {
RunClient("Bob").Then(func(async.Void) {
println("RunClient done")
})
RunClient("Uncle").Then(func(async.Void) {
println("RunClient done")
})
})
}
func RunClient(name string) async.Future[async.Void] {
return async.Async(func(resolve func(async.Void)) {
addr := "127.0.0.1:3927"
socketio.Connect("tcp", addr).Then(func(v tuple.Tuple2[*socketio.Conn, error]) {
client, err := v.Get()
println("Connected", client, err)
if err != nil {
panic(err)
}
counter := 0
var loop func(client *socketio.Conn)
loop = func(client *socketio.Conn) {
counter++
data := fmt.Sprintf("Hello from %s %d", name, counter)
client.Write([]byte(data)).Then(func(err error) {
if err != nil {
panic(err)
}
client.Read().Then(func(v tuple.Tuple2[[]byte, error]) {
data, err := v.Get()
if err != nil {
panic(err)
}
println("Read from server:", string(data))
timeout.Timeout(1 * time.Second).Then(func(async.Void) {
loop(client)
})
})
})
}
loop(client)
})
})
}
func RunServer() async.Future[async.Void] {
return async.Async(func(resolve func(async.Void)) {
socketio.Listen("tcp", "0.0.0.0:3927", func(client *socketio.Conn, err error) {
println("Client connected", client, err)
var loop func(client *socketio.Conn)
loop = func(client *socketio.Conn) {
client.Read().Then(func(v tuple.Tuple2[[]byte, error]) {
data, err := v.Get()
if err != nil {
println("Read error", err)
} else {
println("Read from client:", string(data))
client.Write(data).Then(func(err error) {
if err != nil {
println("Write error", err)
} else {
loop(client)
}
})
}
})
}
loop(client)
})
})
}

34
x/async/async.go Normal file
View File

@@ -0,0 +1,34 @@
/*
* Copyright (c) 2024 The GoPlus Authors (goplus.org). All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package async
import (
_ "unsafe"
)
type Void = [0]byte
type Future[T any] func(func(T))
func (f Future[T]) Then(cb func(T)) {
f(cb)
}
// Just for pure LLGo/Go, transpile to callback in Go+
func Await[T1 any](future Future[T1]) T1 {
return Run(future)
}

84
x/async/async_go.go Normal file
View File

@@ -0,0 +1,84 @@
//go:build !llgo
// +build !llgo
/*
* Copyright (c) 2024 The GoPlus Authors (goplus.org). All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package async
import "sync"
func Async[T any](fn func(func(T))) Future[T] {
var once sync.Once
var result T
var wg sync.WaitGroup
wg.Add(1)
once.Do(func() {
go func() {
fn(func(v T) {
result = v
wg.Done()
})
}()
})
return func(chain func(T)) {
go func() {
wg.Wait()
chain(result)
}()
}
}
// -----------------------------------------------------------------------------
func Race[T1 any](futures ...Future[T1]) Future[T1] {
return Async(func(resolve func(T1)) {
ch := make(chan T1)
for _, future := range futures {
future := future
future.Then(func(v T1) {
defer func() {
// Avoid panic when the channel is closed.
_ = recover()
}()
ch <- v
})
}
v := <-ch
close(ch)
resolve(v)
})
}
func All[T1 any](futures ...Future[T1]) Future[[]T1] {
return Async(func(resolve func([]T1)) {
n := len(futures)
results := make([]T1, n)
wg := sync.WaitGroup{}
wg.Add(n)
for i, future := range futures {
i := i
future.Then(func(v T1) {
results[i] = v
wg.Done()
})
}
wg.Wait()
resolve(results)
})
}

104
x/async/async_llgo.go Normal file
View File

@@ -0,0 +1,104 @@
//go:build llgo
// +build llgo
/*
* Copyright (c) 2024 The GoPlus Authors (goplus.org). All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package async
import (
"sync"
"sync/atomic"
"github.com/goplus/llgo/c/libuv"
"github.com/goplus/llgo/x/cbind"
)
// Currently Async run chain a future that call chain in the goroutine running `async.Run`.
func Async[T any](fn func(func(T))) Future[T] {
var result T
var resultReady atomic.Bool
var callbacks []func(T)
var mutex sync.Mutex
loop := Exec().L
var a *libuv.Async
var cb libuv.AsyncCb
a, cb = cbind.BindF[libuv.Async, libuv.AsyncCb](func(a *libuv.Async) {
a.Close(nil)
mutex.Lock()
currentCallbacks := callbacks
callbacks = nil
mutex.Unlock()
for _, callback := range currentCallbacks {
callback(result)
}
})
loop.Async(a, cb)
// Execute fn immediately
fn(func(v T) {
result = v
resultReady.Store(true)
a.Send()
})
return func(chain func(T)) {
mutex.Lock()
if resultReady.Load() {
mutex.Unlock()
chain(result)
} else {
callbacks = append(callbacks, chain)
mutex.Unlock()
}
}
}
// -----------------------------------------------------------------------------
func Race[T1 any](futures ...Future[T1]) Future[T1] {
return Async(func(resolve func(T1)) {
done := atomic.Bool{}
for _, future := range futures {
future.Then(func(v T1) {
if !done.Swap(true) {
// Just resolve the first one.
resolve(v)
}
})
}
})
}
func All[T1 any](futures ...Future[T1]) Future[[]T1] {
return Async(func(resolve func([]T1)) {
n := len(futures)
results := make([]T1, n)
var done uint32
for i, future := range futures {
i := i
future.Then(func(v T1) {
results[i] = v
if atomic.AddUint32(&done, 1) == uint32(n) {
// All done.
resolve(results)
}
})
}
})
}

30
x/async/executor_go.go Normal file
View File

@@ -0,0 +1,30 @@
//go:build !llgo
// +build !llgo
/*
* Copyright (c) 2024 The GoPlus Authors (goplus.org). All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package async
func Run[T any](future Future[T]) T {
ch := make(chan T)
go func() {
future.Then(func(v T) {
ch <- v
})
}()
return <-ch
}

69
x/async/executor_llgo.go Normal file
View File

@@ -0,0 +1,69 @@
//go:build llgo
// +build llgo
/*
* Copyright (c) 2024 The GoPlus Authors (goplus.org). All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package async
import (
"unsafe"
"github.com/goplus/llgo/c/libuv"
"github.com/goplus/llgo/c/pthread"
)
var execKey pthread.Key
func init() {
execKey.Create(nil)
}
type Executor struct {
L *libuv.Loop
}
func Exec() *Executor {
v := execKey.Get()
if v == nil {
panic("async.Exec: no executor")
}
return (*Executor)(v)
}
func setExec(e *Executor) (old *Executor) {
old = (*Executor)(execKey.Get())
execKey.Set(unsafe.Pointer(e))
return
}
func (e *Executor) Run() {
e.L.Run(libuv.RUN_DEFAULT)
}
func Run[T any](future Future[T]) T {
loop := libuv.LoopNew()
exec := &Executor{loop}
oldExec := setExec(exec)
var ret T
future.Then(func(v T) {
ret = v
})
exec.Run()
loop.Close()
setExec(oldExec)
return ret
}

View File

@@ -0,0 +1,35 @@
//go:build !llgo
// +build !llgo
/*
* Copyright (c) 2024 The GoPlus Authors (goplus.org). All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package timeout
import (
"time"
"github.com/goplus/llgo/x/async"
)
func Timeout(d time.Duration) async.Future[async.Void] {
return async.Async(func(resolve func(async.Void)) {
go func() {
time.Sleep(d)
resolve(async.Void{})
}()
})
}

View File

@@ -0,0 +1,44 @@
//go:build llgo
// +build llgo
/*
* Copyright (c) 2024 The GoPlus Authors (goplus.org). All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package timeout
import (
"time"
"github.com/goplus/llgo/c/libuv"
"github.com/goplus/llgo/x/async"
"github.com/goplus/llgo/x/cbind"
)
func Timeout(d time.Duration) async.Future[async.Void] {
return async.Async(func(resolve func(async.Void)) {
t, cb := cbind.BindF[libuv.Timer, libuv.TimerCb](func(t *libuv.Timer) {
resolve(async.Void{})
})
r := libuv.InitTimer(async.Exec().L, t)
if r != 0 {
panic("InitTimer failed")
}
r = t.Start(cb, uint64(d/time.Millisecond), 0)
if r != 0 {
panic("Start failed")
}
})
}

16
x/cbind/buf.go Normal file
View File

@@ -0,0 +1,16 @@
package cbind
import "unsafe"
type slice struct {
data unsafe.Pointer
len int
}
func GoBytes(buf *int8, n int) []byte {
return *(*[]byte)(unsafe.Pointer(&slice{unsafe.Pointer(buf), n}))
}
func CBuffer(data []byte) (*int8, int) {
return (*int8)(unsafe.Pointer(&data[0])), len(data)
}

143
x/cbind/cbind.go Normal file
View File

@@ -0,0 +1,143 @@
/*
* Copyright (c) 2024 The GoPlus Authors (goplus.org). All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cbind
import (
"unsafe"
)
// llgo:type C
type Cb[T any] func(*T)
// llgo:type C
type Cb1[T any, A any] func(*T, A)
// llgo:type C
type Cb2[T any, A any, B any] func(*T, A, B)
// llgo:type C
type Cb3[T any, A any, B any, C any] func(*T, A, B, C)
type bind[Base any] struct {
b Base
fn func(*Base)
}
type bind1[Base any, A any] struct {
b Base
fn func(*Base, A)
}
type bind2[Base any, A any, B any] struct {
b Base
fn func(*Base, A, B)
}
type bind3[Base any, A any, B any, C any] struct {
b Base
fn func(*Base, A, B, C)
}
func callback[Base any](base *Base) {
bind := (*bind[Base])(unsafe.Pointer(base))
bind.fn(base)
}
func callback1[Base any, A any](base *Base, a A) {
bind := (*bind1[Base, A])(unsafe.Pointer(base))
bind.fn(base, a)
}
func callback2[Base any, A any, B any](base *Base, a A, b B) {
bind := (*bind2[Base, A, B])(unsafe.Pointer(base))
bind.fn(base, a, b)
}
func callback3[Base any, A any, B any, C any](base *Base, a A, b B, c C) {
bind := (*bind3[Base, A, B, C])(unsafe.Pointer(base))
bind.fn(base, a, b, c)
}
/**
* Bind[N] binds a Go function to a C callback function.
*
* Example:
*
* timer, cb := cbind.Bind[libuv.Timer](func() {
* println("hello")
* })
* libuv.InitTimer(async.Exec().L, timer)
* timer.Start(cb, 1000, 0)
*
* @param call The Go function to bind.
* @return The data pointer and the C callback function.
*/
func Bind[T any](call func(*T)) (p *T, cb Cb[T]) {
bb := &bind[T]{fn: call}
p = (*T)(unsafe.Pointer(bb))
cb = callback[T]
return
}
func BindF[T any, F ~func(*T)](call func(*T)) (*T, F) {
bb := &bind[T]{fn: call}
p := (*T)(unsafe.Pointer(bb))
var fn F = callback[T]
return p, fn
}
func Bind1[T any, A any](call func(*T, A)) (p *T, cb Cb1[T, A]) {
bb := &bind1[T, A]{fn: call}
p = (*T)(unsafe.Pointer(bb))
cb = callback1[T, A]
return
}
func Bind1F[T any, F ~func(*T, A), A any](call func(*T, A)) (*T, F) {
bb := &bind1[T, A]{fn: call}
p := (*T)(unsafe.Pointer(bb))
var fn F = callback1[T, A]
return p, fn
}
func Bind2[T any, A any, B any](call func(*T, A, B)) (p *T, cb Cb2[T, A, B]) {
bb := &bind2[T, A, B]{fn: call}
p = (*T)(unsafe.Pointer(bb))
cb = callback2[T, A, B]
return
}
func Bind2F[T any, F ~func(*T, A, B), A any, B any](call func(*T, A, B)) (*T, F) {
bb := &bind2[T, A, B]{fn: call}
p := (*T)(unsafe.Pointer(bb))
var fn F = callback2[T, A, B]
return p, fn
}
func Bind3[T any, A any, B any, C any](call func(*T, A, B, C), a A, b B, c C) (p *T, cb Cb3[T, A, B, C]) {
bb := &bind3[T, A, B, C]{fn: call}
p = (*T)(unsafe.Pointer(bb))
cb = callback3[T, A, B, C]
return
}
func Bind3F[T any, F ~func(*T, A, B, C), A any, B any, C any](call func(*T, A, B, C), a A, b B, c C) (*T, F) {
bb := &bind3[T, A, B, C]{fn: call}
p := (*T)(unsafe.Pointer(bb))
var fn F = callback3[T, A, B, C]
return p, fn
}

View File

@@ -1,290 +0,0 @@
package main
import (
"encoding/json"
"fmt"
"log"
"time"
"github.com/goplus/llgo/x/io"
)
// -----------------------------------------------------------------------------
type Response struct {
StatusCode int
mockBody string
}
func (r *Response) mock(body string) {
r.mockBody = body
}
func (r *Response) Text() (resolve io.Promise[string]) {
resolve(r.mockBody, nil)
return
}
func (r *Response) TextCompiled() *io.PromiseImpl[string] {
P := &io.PromiseImpl[string]{}
P.Func = func(resolve func(string, error)) {
for {
switch P.Prev = P.Next; P.Prev {
case 0:
resolve(r.mockBody, nil)
P.Next = -1
return
default:
panic("Promise already done")
}
}
}
return P
}
func HttpGet(url string, callback func(resp *Response, err error)) {
resp := &Response{StatusCode: 200}
callback(resp, nil)
}
func AsyncHttpGet(url string) (resolve io.Promise[*Response]) {
HttpGet(url, resolve)
return
}
func AsyncHttpGetCompiled(url string) *io.PromiseImpl[*Response] {
P := &io.PromiseImpl[*Response]{}
P.Func = func(resolve func(*Response, error)) {
for {
switch P.Prev = P.Next; P.Prev {
case 0:
HttpGet(url, resolve)
P.Next = -1
return
default:
panic("Promise already done")
}
}
}
return P
}
// -----------------------------------------------------------------------------
type User struct {
Name string
}
func GetUser(uid string) (resolve io.Promise[User]) {
resp, err := AsyncHttpGet("http://example.com/user/" + uid).Await()
if err != nil {
resolve(User{}, err)
return
}
if resp.StatusCode != 200 {
resolve(User{}, fmt.Errorf("http status code: %d", resp.StatusCode))
return
}
resp.mock(`{"name":"Alice"}`)
body, err := resp.Text().Await()
if err != nil {
resolve(User{}, err)
return
}
user := User{}
if err := json.Unmarshal([]byte(body), &user); err != nil {
resolve(User{}, err)
return
}
resolve(user, nil)
return
}
func GetUserCompiled(uid string) *io.PromiseImpl[User] {
var state1 *io.PromiseImpl[*Response]
var state2 *io.PromiseImpl[string]
P := &io.PromiseImpl[User]{}
P.Func = func(resolve func(User, error)) {
for {
switch P.Prev = P.Next; P.Prev {
case 0:
state1 = AsyncHttpGetCompiled("http://example.com/user/" + uid)
P.Next = 1
return
case 1:
state1.EnsureDone()
resp, err := state1.Value, state1.Err
if err != nil {
resolve(User{}, err)
return
}
if resp.StatusCode != 200 {
resolve(User{}, fmt.Errorf("http status code: %d", resp.StatusCode))
return
}
resp.mock(`{"name":"Alice"}`)
state2 = resp.TextCompiled()
P.Next = 2
return
case 2:
state2.EnsureDone()
body, err := state2.Value, state2.Err
if err != nil {
resolve(User{}, err)
return
}
user := User{}
if err := json.Unmarshal([]byte(body), &user); err != nil {
resolve(User{}, err)
return
}
resolve(user, nil)
P.Next = -1
return
default:
panic("Promise already done")
}
}
}
return P
}
func GetScore() *io.Promise[float64] {
panic("todo: GetScore")
}
func GetScoreCompiled() *io.PromiseImpl[float64] {
P := &io.PromiseImpl[float64]{}
P.Func = func(resolve func(float64, error)) {
for {
switch P.Prev = P.Next; P.Prev {
case 0:
panic("todo: GetScore")
default:
panic("Promise already done")
}
}
}
return P
}
func DoUpdate(op string) *io.Promise[io.Void] {
panic("todo: DoUpdate")
}
func DoUpdateCompiled(op string) *io.PromiseImpl[io.Void] {
P := &io.PromiseImpl[io.Void]{}
P.Func = func(resolve func(io.Void, error)) {
for {
switch P.Prev = P.Next; P.Prev {
case 0:
panic("todo: DoUpdate")
default:
panic("Promise already done")
}
}
}
return P
}
func Demo() (resolve io.Promise[io.Void]) {
user, err := GetUser("123").Await()
log.Println(user, err)
user, err = io.Race[User](GetUser("123"), GetUser("456"), GetUser("789")).Await()
log.Println(user, err)
users, err := io.All[User]([]io.AsyncCall[User]{GetUser("123"), GetUser("456"), GetUser("789")}).Await()
log.Println(users, err)
user, score, _, err := io.Await3[User, float64, io.Void](GetUser("123"), GetScore(), DoUpdate("update sth."))
log.Println(user, score, err)
// TODO(lijie): select from multiple promises without channel
select {
case user := <-GetUser("123").Chan():
log.Println("user:", user)
case score := <-GetScore().Chan():
log.Println("score:", score)
case <-io.Timeout(5 * time.Second).Chan():
log.Println("timeout")
}
return
}
func DemoCompiled() *io.PromiseImpl[io.Void] {
var state1 *io.PromiseImpl[User]
var state2 *io.PromiseImpl[User]
var state3 *io.PromiseImpl[[]User]
var state4 *io.PromiseImpl[io.Await3Result[User, float64, io.Void]]
P := &io.PromiseImpl[io.Void]{}
P.Func = func(resolve func(io.Void, error)) {
for {
switch P.Prev = P.Next; P.Prev {
case 0:
state1 = GetUserCompiled("123")
P.Next = 1
return
case 1:
state1.EnsureDone()
user, err := state1.Value, state1.Err
log.Printf("user: %v, err: %v\n", user, err)
state2 = io.Race[User](GetUserCompiled("123"), GetUserCompiled("456"), GetUserCompiled("789"))
P.Next = 2
return
case 2:
state2.EnsureDone()
user, err := state2.Value, state2.Err
log.Println(user, err)
state3 = io.All[User]([]io.AsyncCall[User]{GetUserCompiled("123"), GetUserCompiled("456"), GetUserCompiled("789")})
P.Next = 3
return
case 3:
state3.EnsureDone()
users, err := state3.Value, state3.Err
log.Println(users, err)
state4 = io.Await3Compiled[User, float64, io.Void](GetUserCompiled("123"), GetScoreCompiled(), DoUpdateCompiled("update sth."))
P.Next = 4
return
case 4:
state4.EnsureDone()
user, score, _, err := state4.Value.V1, state4.Value.V2, state4.Value.V3, state4.Value.Err
log.Println(user, score, err)
select {
case user := <-GetUserCompiled("123").Chan():
log.Println("user:", user)
case score := <-GetScoreCompiled().Chan():
log.Println("score:", score)
case <-io.TimeoutCompiled(5 * time.Second).Chan():
log.Println("timeout")
}
P.Next = -1
return
default:
panic("Promise already done")
}
}
}
return P
}
func main() {
log.SetFlags(log.Lshortfile | log.LstdFlags)
// io.Run(Demo())
io.Run(DemoCompiled())
}

View File

@@ -1,170 +0,0 @@
/*
* Copyright (c) 2024 The GoPlus Authors (goplus.org). All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io
import (
_ "unsafe"
"time"
)
const (
LLGoPackage = "decl"
)
type Void = [0]byte
// -----------------------------------------------------------------------------
type AsyncCall[OutT any] interface {
Await(timeout ...time.Duration) (ret OutT, err error)
Chan() <-chan OutT
EnsureDone()
}
// llgo:link AsyncCall.Await llgo.await
func Await[OutT any](call AsyncCall[OutT], timeout ...time.Duration) (ret OutT, err error) {
return
}
//go:linkname Timeout llgo.timeout
func Timeout(time.Duration) (ret AsyncCall[Void])
func TimeoutCompiled(d time.Duration) *PromiseImpl[Void] {
P := &PromiseImpl[Void]{}
P.Func = func(resolve func(Void, error)) {
go func() {
time.Sleep(d)
resolve(Void{}, nil)
}()
}
return P
}
// llgo:link Race llgo.race
func Race[OutT any](acs ...AsyncCall[OutT]) (ret *PromiseImpl[OutT]) {
return
}
func All[OutT any](acs []AsyncCall[OutT]) (ret *PromiseImpl[[]OutT]) {
return nil
}
// llgo:link Await2 llgo.await
func Await2[OutT1, OutT2 any](
ac1 AsyncCall[OutT1], ac2 AsyncCall[OutT2],
timeout ...time.Duration) (ret1 OutT1, ret2 OutT2, err error) {
return
}
type Await2Result[T1 any, T2 any] struct {
V1 T1
V2 T2
Err error
}
func Await2Compiled[OutT1, OutT2 any](
ac1 AsyncCall[OutT1], ac2 AsyncCall[OutT2],
timeout ...time.Duration) (ret *PromiseImpl[Await2Result[OutT1, OutT2]]) {
return
}
// llgo:link Await3 llgo.await
func Await3[OutT1, OutT2, OutT3 any](
ac1 AsyncCall[OutT1], ac2 AsyncCall[OutT2], ac3 AsyncCall[OutT3],
timeout ...time.Duration) (ret1 OutT1, ret2 OutT2, ret3 OutT3, err error) {
return
}
type Await3Result[T1 any, T2 any, T3 any] struct {
V1 T1
V2 T2
V3 T3
Err error
}
func Await3Compiled[OutT1, OutT2, OutT3 any](
ac1 AsyncCall[OutT1], ac2 AsyncCall[OutT2], ac3 AsyncCall[OutT3],
timeout ...time.Duration) (ret *PromiseImpl[Await3Result[OutT1, OutT2, OutT3]]) {
return
}
func Run(ac AsyncCall[Void]) {
p := ac.(*PromiseImpl[Void])
p.Resume()
<-ac.Chan()
}
// -----------------------------------------------------------------------------
type Promise[OutT any] func(OutT, error)
// llgo:link Promise.Await llgo.await
func (p Promise[OutT]) Await(timeout ...time.Duration) (ret OutT, err error) {
return
}
func (p Promise[OutT]) Chan() <-chan OutT {
return nil
}
func (p Promise[OutT]) EnsureDone() {
}
// -----------------------------------------------------------------------------
type PromiseImpl[TOut any] struct {
Func func(resolve func(TOut, error))
Value TOut
Err error
Prev int
Next int
c chan TOut
}
func (p *PromiseImpl[TOut]) Resume() {
p.Func(func(v TOut, err error) {
p.Value = v
p.Err = err
})
}
func (p *PromiseImpl[TOut]) EnsureDone() {
if p.Next == -1 {
panic("Promise already done")
}
}
func (p *PromiseImpl[TOut]) Chan() <-chan TOut {
if p.c == nil {
p.c = make(chan TOut, 1)
p.Func(func(v TOut, err error) {
p.Value = v
p.Err = err
p.c <- v
})
}
return p.c
}
func (p *PromiseImpl[TOut]) Await(timeout ...time.Duration) (ret TOut, err error) {
panic("should not called")
}
// -----------------------------------------------------------------------------

92
x/socketio/socketio_go.go Normal file
View File

@@ -0,0 +1,92 @@
//go:build !llgo
// +build !llgo
/*
* Copyright (c) 2024 The GoPlus Authors (goplus.org). All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package socketio
import (
"net"
"github.com/goplus/llgo/x/async"
"github.com/goplus/llgo/x/tuple"
)
type Conn struct {
conn net.Conn
}
func Listen(protocol, bindAddr string, listenCb func(client *Conn, err error)) {
go func() {
listener, err := net.Listen(protocol, bindAddr)
if err != nil {
listenCb(nil, err)
return
}
for {
conn, err := listener.Accept()
if err != nil {
listenCb(nil, err)
return
}
listenCb(&Conn{conn: conn}, nil)
}
}()
}
func Connect(network, addr string) async.Future[tuple.Tuple2[*Conn, error]] {
return async.Async(func(resolve func(tuple.Tuple2[*Conn, error])) {
go func() {
conn, err := net.Dial(network, addr)
if err != nil {
resolve(tuple.T2[*Conn, error](nil, err))
return
}
resolve(tuple.T2[*Conn, error](&Conn{conn: conn}, nil))
}()
})
}
// Read once from the TCP connection.
func (t *Conn) Read() async.Future[tuple.Tuple2[[]byte, error]] {
return async.Async(func(resolve func(tuple.Tuple2[[]byte, error])) {
go func() {
buf := make([]byte, 1024)
n, err := t.conn.Read(buf)
if err != nil {
resolve(tuple.T2[[]byte, error](nil, err))
return
}
resolve(tuple.T2[[]byte, error](buf[:n], nil))
}()
})
}
func (t *Conn) Write(data []byte) async.Future[error] {
return async.Async(func(resolve func(error)) {
go func() {
_, err := t.conn.Write(data)
resolve(err)
}()
})
}
func (t *Conn) Close() {
if t.conn != nil {
t.conn.Close()
}
}

260
x/socketio/socketio_llgo.go Normal file
View File

@@ -0,0 +1,260 @@
//go:build llgo
// +build llgo
/*
* Copyright (c) 2024 The GoPlus Authors (goplus.org). All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package socketio
import (
"strings"
"syscall"
"unsafe"
_ "unsafe"
"github.com/goplus/llgo/c"
"github.com/goplus/llgo/c/libuv"
"github.com/goplus/llgo/c/net"
"github.com/goplus/llgo/x/async"
"github.com/goplus/llgo/x/cbind"
"github.com/goplus/llgo/x/tuple"
)
type Listener struct {
tcp libuv.Tcp
listenCb func(server *Listener, err error)
}
type Conn struct {
tcp libuv.Tcp
readCb func([]byte, error)
}
type libuvError libuv.Errno
func (e libuvError) Error() string {
s := libuv.Strerror(libuv.Errno(e))
return c.GoString(s, c.Strlen(s))
}
type getAddrInfoBind struct {
libuv.GetAddrInfo
resolve func(tuple.Tuple2[*net.SockAddr, error])
}
func getAddrInfoCb(p *libuv.GetAddrInfo, status c.Int, addr *net.AddrInfo) {
bind := (*getAddrInfoBind)(unsafe.Pointer(p))
if status != 0 {
bind.resolve(tuple.T2[*net.SockAddr, error](nil, libuvError(status)))
return
}
bind.resolve(tuple.T2[*net.SockAddr, error](addr.Addr, nil))
}
func parseAddr(addr string) async.Future[tuple.Tuple2[*net.SockAddr, error]] {
return async.Async(func(resolve func(tuple.Tuple2[*net.SockAddr, error])) {
host := "127.0.0.1"
var port string
// split host and service by last colon
idx := strings.LastIndex(addr, ":")
if idx < 0 {
port = addr
} else {
host = addr[:idx]
port = addr[idx+1:]
}
hints := &net.AddrInfo{
Family: net.AF_INET,
SockType: net.SOCK_STREAM,
Protocol: syscall.IPPROTO_TCP,
Flags: 0,
}
req, cb := cbind.Bind2F[libuv.GetAddrInfo, libuv.GetaddrinfoCb](func(i *libuv.GetAddrInfo, status c.Int, addr *net.AddrInfo) {
if status != 0 {
resolve(tuple.T2[*net.SockAddr, error](nil, libuvError(status)))
return
}
resolve(tuple.T2[*net.SockAddr, error](addr.Addr, nil))
})
if res := libuv.Getaddrinfo(async.Exec().L, req, cb, c.AllocaCStr(host), c.AllocaCStr(port), hints); res != 0 {
resolve(tuple.T2[*net.SockAddr, error](nil, libuvError(res)))
return
}
})
}
func Listen(protocol, bindAddr string, listenCb func(client *Conn, err error)) {
tcp, err := newListener()
if err != nil {
listenCb(nil, err)
return
}
parseAddr(bindAddr).Then(func(v tuple.Tuple2[*net.SockAddr, error]) {
addr, err := v.Get()
if err != nil {
listenCb(nil, err)
return
}
if err := tcp.bind(addr, 0); err != nil {
listenCb(nil, err)
return
}
if err := tcp.listen(128, func(server *Listener, err error) {
client, err := server.accept()
listenCb(client, err)
}); err != nil {
listenCb(nil, err)
}
})
}
func newListener() (*Listener, error) {
t := &Listener{}
if res := libuv.InitTcp(async.Exec().L, &t.tcp); res != 0 {
return nil, libuvError(res)
}
return t, nil
}
func (t *Listener) bind(addr *net.SockAddr, flags uint) error {
if res := t.tcp.Bind(addr, c.Uint(flags)); res != 0 {
return libuvError(res)
}
return nil
}
func (l *Listener) listen(backlog int, cb func(server *Listener, err error)) error {
l.listenCb = cb
res := (*libuv.Stream)(&l.tcp).Listen(c.Int(backlog), func(s *libuv.Stream, status c.Int) {
server := (*Listener)(unsafe.Pointer(s))
if status != 0 {
server.listenCb(server, libuvError(libuv.Errno(status)))
} else {
server.listenCb(server, nil)
}
})
if res != 0 {
return libuvError(res)
}
return nil
}
func (l *Listener) accept() (client *Conn, err error) {
tcp := &Conn{}
if res := libuv.InitTcp(async.Exec().L, &tcp.tcp); res != 0 {
return nil, libuvError(res)
}
if res := (*libuv.Stream)(&l.tcp).Accept((*libuv.Stream)(&tcp.tcp)); res != 0 {
return nil, libuvError(res)
}
return tcp, nil
}
func Connect(network, addr string) async.Future[tuple.Tuple2[*Conn, error]] {
return async.Async(func(resolve func(tuple.Tuple2[*Conn, error])) {
parseAddr(addr).Then(func(v tuple.Tuple2[*net.SockAddr, error]) {
addr, err := v.Get()
if err != nil {
resolve(tuple.T2[*Conn, error]((*Conn)(nil), err))
return
}
tcp := &Conn{}
if res := libuv.InitTcp(async.Exec().L, &tcp.tcp); res != 0 {
resolve(tuple.T2[*Conn, error]((*Conn)(nil), libuvError(res)))
return
}
req, cb := cbind.Bind1F[libuv.Connect, libuv.ConnectCb](func(c *libuv.Connect, status c.Int) {
if status != 0 {
resolve(tuple.T2[*Conn, error]((*Conn)(nil), libuvError(libuv.Errno(status))))
} else {
resolve(tuple.T2[*Conn, error](tcp, nil))
}
})
if res := libuv.TcpConnect(req, &tcp.tcp, addr, cb); res != 0 {
resolve(tuple.T2[*Conn, error]((*Conn)(nil), libuvError(res)))
return
}
})
})
}
func allocBuffer(handle *libuv.Handle, suggestedSize uintptr, buf *libuv.Buf) {
buf.Base = (*c.Char)(c.Malloc(suggestedSize))
buf.Len = suggestedSize
}
func (t *Conn) StartRead(fn func(data []byte, err error)) {
t.readCb = func(data []byte, err error) {
fn(data, err)
}
tcp := (*libuv.Stream)(&t.tcp)
res := tcp.StartRead(allocBuffer, func(client *libuv.Stream, nread c.Long, buf *libuv.Buf) {
tcp := (*Conn)(unsafe.Pointer(client))
if nread > 0 {
tcp.readCb(cbind.GoBytes(buf.Base, int(nread)), nil)
} else if nread < 0 {
tcp.readCb(nil, libuvError(libuv.Errno(nread)))
} else {
tcp.readCb(nil, nil)
}
})
if res != 0 {
t.readCb(nil, libuvError(libuv.Errno(res)))
}
}
func (t *Conn) StopRead() error {
tcp := (*libuv.Stream)(&t.tcp)
if res := tcp.StopRead(); res != 0 {
return libuvError(libuv.Errno(res))
}
return nil
}
// Read once from the TCP connection.
func (t *Conn) Read() async.Future[tuple.Tuple2[[]byte, error]] {
return async.Async(func(resolve func(tuple.Tuple2[[]byte, error])) {
t.StartRead(func(data []byte, err error) {
if err := t.StopRead(); err != nil {
panic(err)
}
resolve(tuple.T2[[]byte, error](data, err))
})
})
}
func (t *Conn) Write(data []byte) async.Future[error] {
return async.Async(func(resolve func(error)) {
writer, cb := cbind.Bind1F[libuv.Write, libuv.WriteCb](func(req *libuv.Write, status c.Int) {
var result error
if status != 0 {
result = libuvError(libuv.Errno(status))
}
resolve(result)
})
tcp := (*libuv.Stream)(&t.tcp)
buf, len := cbind.CBuffer(data)
bufs := &libuv.Buf{Base: buf, Len: uintptr(len)}
writer.Write(tcp, bufs, 1, cb)
})
}
func (t *Conn) Close() {
(*libuv.Handle)(unsafe.Pointer(&t.tcp)).Close(nil)
}

40
x/tuple/tuple.go Normal file
View File

@@ -0,0 +1,40 @@
package tuple
type Tuple[T any] struct {
v T
}
func T[T any](v T) Tuple[T] {
return Tuple[T]{v: v}
}
func (t Tuple[T]) Get() T {
return t.v
}
type Tuple2[T1 any, T2 any] struct {
v1 T1
v2 T2
}
func T2[T1 any, T2 any](v1 T1, v2 T2) Tuple2[T1, T2] {
return Tuple2[T1, T2]{v1: v1, v2: v2}
}
func (t Tuple2[T1, T2]) Get() (T1, T2) {
return t.v1, t.v2
}
type Tuple3[T1 any, T2 any, T3 any] struct {
v1 T1
v2 T2
v3 T3
}
func T3[T1 any, T2 any, T3 any](v1 T1, v2 T2, v3 T3) Tuple3[T1, T2, T3] {
return Tuple3[T1, T2, T3]{v1: v1, v2: v2, v3: v3}
}
func (t Tuple3[T1, T2, T3]) Get() (T1, T2, T3) {
return t.v1, t.v2, t.v3
}