async.Run/Await/Race/All
This commit is contained in:
210
x/io/io.go
210
x/io/io.go
@@ -17,154 +17,118 @@
|
||||
package io
|
||||
|
||||
import (
|
||||
"unsafe"
|
||||
_ "unsafe"
|
||||
|
||||
"time"
|
||||
"github.com/goplus/llgo/c"
|
||||
"github.com/goplus/llgo/c/libuv"
|
||||
"github.com/goplus/llgo/c/net"
|
||||
"github.com/goplus/llgo/x/async"
|
||||
"github.com/goplus/llgo/x/cbind"
|
||||
"github.com/goplus/llgo/x/tuple"
|
||||
)
|
||||
|
||||
const (
|
||||
LLGoPackage = "decl"
|
||||
)
|
||||
|
||||
type Void = [0]byte
|
||||
|
||||
// -----------------------------------------------------------------------------
|
||||
|
||||
type AsyncCall[OutT any] interface {
|
||||
Await(timeout ...time.Duration) (ret OutT, err error)
|
||||
Chan() <-chan OutT
|
||||
EnsureDone()
|
||||
type Tcp struct {
|
||||
tcp *libuv.Tcp
|
||||
}
|
||||
|
||||
// llgo:link AsyncCall.Await llgo.await
|
||||
func Await[OutT any](call AsyncCall[OutT], timeout ...time.Duration) (ret OutT, err error) {
|
||||
return
|
||||
type libuvError libuv.Errno
|
||||
|
||||
func (e libuvError) Error() string {
|
||||
s := libuv.Strerror(libuv.Errno(e))
|
||||
return c.GoString(s, c.Strlen(s))
|
||||
}
|
||||
|
||||
//go:linkname Timeout llgo.timeout
|
||||
func Timeout(time.Duration) (ret AsyncCall[Void])
|
||||
func NewTcp() *Tcp {
|
||||
t := &Tcp{&libuv.Tcp{}}
|
||||
libuv.InitTcp(async.Exec().L, t.tcp)
|
||||
return t
|
||||
}
|
||||
|
||||
func TimeoutCompiled(d time.Duration) *PromiseImpl[Void] {
|
||||
P := &PromiseImpl[Void]{}
|
||||
P.Func = func(resolve func(Void, error)) {
|
||||
go func() {
|
||||
time.Sleep(d)
|
||||
resolve(Void{}, nil)
|
||||
}()
|
||||
func (t *Tcp) Bind(addr *net.SockAddr, flags uint) error {
|
||||
if res := t.tcp.Bind(addr, c.Uint(flags)); res != 0 {
|
||||
return libuvError(res)
|
||||
}
|
||||
return P
|
||||
}
|
||||
|
||||
// llgo:link Race llgo.race
|
||||
func Race[OutT any](acs ...AsyncCall[OutT]) (ret *PromiseImpl[OutT]) {
|
||||
return
|
||||
}
|
||||
|
||||
func All[OutT any](acs []AsyncCall[OutT]) (ret *PromiseImpl[[]OutT]) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// llgo:link Await2 llgo.await
|
||||
func Await2[OutT1, OutT2 any](
|
||||
ac1 AsyncCall[OutT1], ac2 AsyncCall[OutT2],
|
||||
timeout ...time.Duration) (ret1 OutT1, ret2 OutT2, err error) {
|
||||
return
|
||||
}
|
||||
|
||||
type Await2Result[T1 any, T2 any] struct {
|
||||
V1 T1
|
||||
V2 T2
|
||||
Err error
|
||||
}
|
||||
|
||||
func Await2Compiled[OutT1, OutT2 any](
|
||||
ac1 AsyncCall[OutT1], ac2 AsyncCall[OutT2],
|
||||
timeout ...time.Duration) (ret *PromiseImpl[Await2Result[OutT1, OutT2]]) {
|
||||
return
|
||||
}
|
||||
|
||||
// llgo:link Await3 llgo.await
|
||||
func Await3[OutT1, OutT2, OutT3 any](
|
||||
ac1 AsyncCall[OutT1], ac2 AsyncCall[OutT2], ac3 AsyncCall[OutT3],
|
||||
timeout ...time.Duration) (ret1 OutT1, ret2 OutT2, ret3 OutT3, err error) {
|
||||
return
|
||||
}
|
||||
|
||||
type Await3Result[T1 any, T2 any, T3 any] struct {
|
||||
V1 T1
|
||||
V2 T2
|
||||
V3 T3
|
||||
Err error
|
||||
}
|
||||
|
||||
func Await3Compiled[OutT1, OutT2, OutT3 any](
|
||||
ac1 AsyncCall[OutT1], ac2 AsyncCall[OutT2], ac3 AsyncCall[OutT3],
|
||||
timeout ...time.Duration) (ret *PromiseImpl[Await3Result[OutT1, OutT2, OutT3]]) {
|
||||
return
|
||||
}
|
||||
|
||||
func Run(ac AsyncCall[Void]) {
|
||||
p := ac.(*PromiseImpl[Void])
|
||||
p.Resume()
|
||||
<-ac.Chan()
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------------------
|
||||
|
||||
type Promise[OutT any] func(OutT, error)
|
||||
|
||||
// llgo:link Promise.Await llgo.await
|
||||
func (p Promise[OutT]) Await(timeout ...time.Duration) (ret OutT, err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (p Promise[OutT]) Chan() <-chan OutT {
|
||||
func (t *Tcp) Listen(backlog int, cb libuv.ConnectionCb) error {
|
||||
if res := (*libuv.Stream)(t.tcp).Listen(c.Int(backlog), cb); res != 0 {
|
||||
return libuvError(res)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p Promise[OutT]) EnsureDone() {
|
||||
|
||||
func (t *Tcp) Accept() (client *Tcp, err error) {
|
||||
tcp := &libuv.Tcp{}
|
||||
if res := libuv.InitTcp(async.Exec().L, tcp); res != 0 {
|
||||
return nil, libuvError(res)
|
||||
}
|
||||
if res := (*libuv.Stream)(t.tcp).Accept((*libuv.Stream)(client.tcp)); res != 0 {
|
||||
return nil, libuvError(res)
|
||||
}
|
||||
return &Tcp{tcp}, nil
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------------------
|
||||
|
||||
type PromiseImpl[TOut any] struct {
|
||||
Func func(resolve func(TOut, error))
|
||||
Value TOut
|
||||
Err error
|
||||
Prev int
|
||||
Next int
|
||||
|
||||
c chan TOut
|
||||
}
|
||||
|
||||
func (p *PromiseImpl[TOut]) Resume() {
|
||||
p.Func(func(v TOut, err error) {
|
||||
p.Value = v
|
||||
p.Err = err
|
||||
func Connect(addr *net.SockAddr) async.IO[tuple.Tuple2[*Tcp, error]] {
|
||||
return async.Async(func(resolve func(tuple.Tuple2[*Tcp, error])) {
|
||||
tcp := &libuv.Tcp{}
|
||||
if res := libuv.InitTcp(async.Exec().L, tcp); res != 0 {
|
||||
resolve(tuple.T2[*Tcp, error]((*Tcp)(nil), libuvError(res)))
|
||||
return
|
||||
}
|
||||
req, cb := cbind.Bind1[libuv.Connect](func(status c.Int) {
|
||||
if status != 0 {
|
||||
resolve(tuple.T2[*Tcp, error]((*Tcp)(nil), libuvError(libuv.Errno(status))))
|
||||
} else {
|
||||
resolve(tuple.T2[*Tcp, error](&Tcp{tcp}, nil))
|
||||
}
|
||||
})
|
||||
if res := libuv.TcpConnect(req, tcp, addr, cb); res != 0 {
|
||||
resolve(tuple.T2[*Tcp, error]((*Tcp)(nil), libuvError(res)))
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func (p *PromiseImpl[TOut]) EnsureDone() {
|
||||
if p.Next == -1 {
|
||||
panic("Promise already done")
|
||||
}
|
||||
func allocBuffer(handle *libuv.Handle, suggestedSize uintptr, buf *libuv.Buf) {
|
||||
buf.Base = (*c.Char)(c.Malloc(suggestedSize))
|
||||
buf.Len = suggestedSize
|
||||
}
|
||||
|
||||
func (p *PromiseImpl[TOut]) Chan() <-chan TOut {
|
||||
if p.c == nil {
|
||||
p.c = make(chan TOut, 1)
|
||||
p.Func(func(v TOut, err error) {
|
||||
p.Value = v
|
||||
p.Err = err
|
||||
p.c <- v
|
||||
type slice struct {
|
||||
data unsafe.Pointer
|
||||
len int
|
||||
}
|
||||
|
||||
func goBytes(buf *int8, n int) []byte {
|
||||
return *(*[]byte)(unsafe.Pointer(&slice{unsafe.Pointer(buf), n}))
|
||||
}
|
||||
|
||||
func (t *Tcp) Read() async.IO[tuple.Tuple2[[]byte, error]] {
|
||||
return func(ctx *async.AsyncContext) async.Future[tuple.Tuple2[[]byte, error]] {
|
||||
var result tuple.Tuple2[[]byte, error]
|
||||
var done bool
|
||||
tcp := (*libuv.Stream)(t.tcp)
|
||||
libuv.ReadStart(tcp, allocBuffer, func(client *libuv.Stream, nread c.Long, buf *libuv.Buf) {
|
||||
if nread > 0 {
|
||||
result = tuple.T2[[]byte, error](goBytes(buf.Base, int(nread)), nil)
|
||||
} else if nread < 0 {
|
||||
result = tuple.T2[[]byte, error](nil, libuvError(libuv.Errno(nread)))
|
||||
} else {
|
||||
result = tuple.T2[[]byte, error](nil, nil)
|
||||
}
|
||||
done = true
|
||||
ctx.Complete()
|
||||
})
|
||||
return func() tuple.Tuple2[[]byte, error] {
|
||||
if !done {
|
||||
panic("Tcp.Read: Future accessed before completion")
|
||||
}
|
||||
return result
|
||||
}
|
||||
}
|
||||
return p.c
|
||||
}
|
||||
|
||||
func (p *PromiseImpl[TOut]) Await(timeout ...time.Duration) (ret TOut, err error) {
|
||||
panic("should not called")
|
||||
func (t *Tcp) Close(cb libuv.CloseCb) {
|
||||
(*libuv.Handle)(unsafe.Pointer(t.tcp)).Close(cb)
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------------------
|
||||
|
||||
Reference in New Issue
Block a user