diff --git a/x/socketio/README.md b/x/async/README.md similarity index 97% rename from x/socketio/README.md rename to x/async/README.md index da6a4c8e..f8ccc110 100644 --- a/x/socketio/README.md +++ b/x/async/README.md @@ -372,18 +372,22 @@ Introduce `async.IO[T]` type to represent an asynchronous operation, `async.Futu ```go package async -type Future[T any] func() T -type IO[T any] func() Future[T] +type Future[T any] func(func(T)) + +func Await[T any](future Future[T]) T func main() { - io := func() Future[string] { - return func() string { - return "Hello, World!" + hello := func() Future[string] { + return func(resolve func(string)) { + resolve("Hello, World!") } } - future := io() - value := future() - println(value) + future := hello() + future(func(value string) { + println(value) + }) + + println(Await(future)) } ``` diff --git a/x/async/TODO.md b/x/async/TODO.md new file mode 100644 index 00000000..78a2373a --- /dev/null +++ b/x/async/TODO.md @@ -0,0 +1,16 @@ +讨论: + +1. Future 用 interface 还是闭包:性能应该差不多,如果没有其他方法要暴露,感觉也没有换成 interface 的必要。 +2. 几个方法提供不同参数个数的版本还是用 tuple:如果编译器不支持可变泛型参数个数和特化,我倾向用 tuple 先简化实现,tuple 的开销应该也容易被编译器优化掉。多个方法让用户选择 Await2/Await3 这种也恶心。 +3. 是否 Cancellable,暂时不加进去,多一个 context,也不一定能快速稳定下来,可以后面根据实践再改。 +4. Executor 可能会变化,目前提供的 Run 是阻塞的,也可以把它做成异步。 +5. 尽量再隐藏一些辅助类型,比如 TupleN,可能之提供 tuple 的构造和返回多值。内部的 libuv 如果隐藏可能要暴露同等接口,先不动了 +6. 性能可能做个简单测试,但不是关键,只要别太差。未来可能会尽量减少 executor 的切换、尽量多并行 +7. 异常兼容性:目前没考虑,这个要在回调里处理可能困难,要么就在 await 上处理,可以往后放一下,毕竟 golang 主要是以 error 为主 +8. 可能先看一下如何在 go+里面集成,判断目前的设计实现是否合理 +9. 多封装一些库看看通用性和易用性,\_demo 里几个简单例子基本符合预期,还需要更多检验 + +TODO: + +10. select 兼容 (可能把 Future 改为 interface 更合理?) +11. Future 只会被执行一次 diff --git a/x/async/_demo/all/all.go b/x/async/_demo/all/all.go index 0f78e648..004ba402 100644 --- a/x/async/_demo/all/all.go +++ b/x/async/_demo/all/all.go @@ -32,7 +32,7 @@ func WriteFile(fileName string, content []byte) async.Future[error] { func sleep(i int, d time.Duration) async.Future[int] { return async.Async(func(resolve func(int)) { - timeout.Timeout(d)(func(async.Void) { + timeout.Timeout(d).Then(func(async.Void) { resolve(i) }) }) @@ -70,7 +70,7 @@ func RunIO() { println("RunIO with BindIO") async.Run(async.Async(func(resolve func(async.Void)) { - ReadFile("all.go")(func(v tuple.Tuple2[[]byte, error]) { + ReadFile("all.go").Then(func(v tuple.Tuple2[[]byte, error]) { content, err := v.Get() if err != nil { fmt.Printf("read err: %v\n", err) @@ -78,7 +78,7 @@ func RunIO() { return } fmt.Printf("read content: %s\n", content) - WriteFile("2.out", content)(func(v error) { + WriteFile("2.out", content).Then(func(v error) { err = v if err != nil { fmt.Printf("write err: %v\n", err) @@ -100,7 +100,7 @@ func RunAllAndRace() { println("Run All with Await") async.Run(async.Async(func(resolve func(async.Void)) { - async.All(sleep(1, ms200), sleep(2, ms100), sleep(3, ms300))(func(v []int) { + async.All(sleep(1, ms200), sleep(2, ms100), sleep(3, ms300)).Then(func(v []int) { fmt.Printf("All: %v\n", v) resolve(async.Void{}) }) @@ -120,7 +120,7 @@ func RunAllAndRace() { println("Run All with BindIO") async.Run(async.Async(func(resolve func(async.Void)) { - async.All(sleep(1, ms200), sleep(2, ms100), sleep(3, ms300))(func(v []int) { + async.All(sleep(1, ms200), sleep(2, ms100), sleep(3, ms300)).Then(func(v []int) { fmt.Printf("All: %v\n", v) resolve(async.Void{}) }) @@ -129,7 +129,7 @@ func RunAllAndRace() { println("Run Race with BindIO") async.Run(async.Async(func(resolve func(async.Void)) { - async.Race(sleep(1, ms200), sleep(2, ms100), sleep(3, ms300))(func(v int) { + async.Race(sleep(1, ms200), sleep(2, ms100), sleep(3, ms300)).Then(func(v int) { fmt.Printf("Race: %v\n", v) resolve(async.Void{}) }) @@ -152,7 +152,7 @@ func RunTimeout() { async.Run(async.Async(func(resolve func(async.Void)) { fmt.Printf("Start 100 ms timeout\n") - timeout.Timeout(100 * time.Millisecond)(func(async.Void) { + timeout.Timeout(100 * time.Millisecond).Then(func(async.Void) { fmt.Printf("timeout\n") resolve(async.Void{}) }) @@ -165,15 +165,15 @@ func RunSocket() { async.Run(async.Async(func(resolve func(async.Void)) { println("RunServer") - RunServer()(func(async.Void) { + RunServer().Then(func(async.Void) { println("RunServer done") resolve(async.Void{}) }) println("RunClient") - timeout.Timeout(100 * time.Millisecond)(func(async.Void) { - RunClient()(func(async.Void) { + timeout.Timeout(100 * time.Millisecond).Then(func(async.Void) { + RunClient().Then(func(async.Void) { println("RunClient done") resolve(async.Void{}) }) @@ -184,7 +184,7 @@ func RunSocket() { func RunClient() async.Future[async.Void] { return async.Async(func(resolve func(async.Void)) { addr := "127.0.0.1:3927" - socketio.Connect("tcp", addr)(func(v tuple.Tuple2[*socketio.Conn, error]) { + socketio.Connect("tcp", addr).Then(func(v tuple.Tuple2[*socketio.Conn, error]) { client, err := v.Get() println("Connected", client, err) if err != nil { @@ -195,17 +195,17 @@ func RunClient() async.Future[async.Void] { loop = func(client *socketio.Conn) { counter++ data := fmt.Sprintf("Hello %d", counter) - client.Write([]byte(data))(func(err error) { + client.Write([]byte(data)).Then(func(err error) { if err != nil { panic(err) } - client.Read()(func(v tuple.Tuple2[[]byte, error]) { + client.Read().Then(func(v tuple.Tuple2[[]byte, error]) { data, err := v.Get() if err != nil { panic(err) } println("Read from server:", string(data)) - timeout.Timeout(1 * time.Second)(func(async.Void) { + timeout.Timeout(1 * time.Second).Then(func(async.Void) { loop(client) }) }) @@ -222,13 +222,13 @@ func RunServer() async.Future[async.Void] { println("Client connected", client, err) var loop func(client *socketio.Conn) loop = func(client *socketio.Conn) { - client.Read()(func(v tuple.Tuple2[[]byte, error]) { + client.Read().Then(func(v tuple.Tuple2[[]byte, error]) { data, err := v.Get() if err != nil { println("Read error", err) } else { println("Read from client:", string(data)) - client.Write(data)(func(err error) { + client.Write(data).Then(func(err error) { if err != nil { println("Write error", err) } else { diff --git a/x/async/async.go b/x/async/async.go index fcd9a2e9..6b93f0e7 100644 --- a/x/async/async.go +++ b/x/async/async.go @@ -24,6 +24,10 @@ type Void = [0]byte type Future[T any] func(func(T)) +func (f Future[T]) Then(cb func(T)) { + f(cb) +} + // Just for pure LLGo/Go, transpile to callback in Go+ func Await[T1 any](future Future[T1]) T1 { return Run(future) diff --git a/x/async/async_go.go b/x/async/async_go.go index c775b7e8..47f691c3 100644 --- a/x/async/async_go.go +++ b/x/async/async_go.go @@ -34,7 +34,7 @@ func Race[T1 any](futures ...Future[T1]) Future[T1] { ch := make(chan T1) for _, future := range futures { future := future - future(func(v T1) { + future.Then(func(v T1) { defer func() { // Avoid panic when the channel is closed. _ = recover() @@ -56,7 +56,7 @@ func All[T1 any](futures ...Future[T1]) Future[[]T1] { wg.Add(n) for i, future := range futures { i := i - future(func(v T1) { + future.Then(func(v T1) { results[i] = v wg.Done() }) diff --git a/x/async/async_llgo.go b/x/async/async_llgo.go index d301b20c..9a7c7285 100644 --- a/x/async/async_llgo.go +++ b/x/async/async_llgo.go @@ -53,7 +53,7 @@ func Race[T1 any](futures ...Future[T1]) Future[T1] { return Async(func(resolve func(T1)) { done := atomic.Bool{} for _, future := range futures { - future(func(v T1) { + future.Then(func(v T1) { if !done.Swap(true) { // Just resolve the first one. resolve(v) @@ -70,7 +70,7 @@ func All[T1 any](futures ...Future[T1]) Future[[]T1] { var done uint32 for i, future := range futures { i := i - future(func(v T1) { + future.Then(func(v T1) { results[i] = v if atomic.AddUint32(&done, 1) == uint32(n) { // All done. diff --git a/x/async/executor_go.go b/x/async/executor_go.go index 60962638..38667aa7 100644 --- a/x/async/executor_go.go +++ b/x/async/executor_go.go @@ -22,7 +22,7 @@ package async func Run[T any](future Future[T]) T { ch := make(chan T) go func() { - future(func(v T) { + future.Then(func(v T) { ch <- v }) }() diff --git a/x/async/executor_llgo.go b/x/async/executor_llgo.go index e1e03a6a..af28295f 100644 --- a/x/async/executor_llgo.go +++ b/x/async/executor_llgo.go @@ -59,7 +59,7 @@ func Run[T any](future Future[T]) T { exec := &Executor{loop} oldExec := setExec(exec) var ret T - future(func(v T) { + future.Then(func(v T) { ret = v }) exec.Run() diff --git a/x/socketio/socketio_llgo.go b/x/socketio/socketio_llgo.go index e6b799a5..970e70c6 100644 --- a/x/socketio/socketio_llgo.go +++ b/x/socketio/socketio_llgo.go @@ -104,7 +104,7 @@ func Listen(protocol, bindAddr string, listenCb func(client *Conn, err error)) { listenCb(nil, err) return } - parseAddr(bindAddr)(func(v tuple.Tuple2[*net.SockAddr, error]) { + parseAddr(bindAddr).Then(func(v tuple.Tuple2[*net.SockAddr, error]) { addr, err := v.Get() if err != nil { listenCb(nil, err) @@ -167,7 +167,7 @@ func (l *Listener) accept() (client *Conn, err error) { func Connect(network, addr string) async.Future[tuple.Tuple2[*Conn, error]] { return async.Async(func(resolve func(tuple.Tuple2[*Conn, error])) { - parseAddr(addr)(func(v tuple.Tuple2[*net.SockAddr, error]) { + parseAddr(addr).Then(func(v tuple.Tuple2[*net.SockAddr, error]) { addr, err := v.Get() if err != nil { resolve(tuple.T2[*Conn, error]((*Conn)(nil), err))