From 22f4924d8eeb9544cb0c8a0f8691ba455bc06e74 Mon Sep 17 00:00:00 2001 From: Li Jie Date: Sun, 16 Feb 2025 20:09:06 +0800 Subject: [PATCH] runtime: safe mutex initialization --- _demo/sync/sync.go | 112 ++++++++++++++++++++ runtime/internal/clite/pthread/sync/sync.go | 8 +- runtime/internal/lib/sync/sync.go | 109 ++++++++++++------- runtime/internal/runtime/z_face.go | 7 +- 4 files changed, 193 insertions(+), 43 deletions(-) create mode 100644 _demo/sync/sync.go diff --git a/_demo/sync/sync.go b/_demo/sync/sync.go new file mode 100644 index 00000000..b88d93a4 --- /dev/null +++ b/_demo/sync/sync.go @@ -0,0 +1,112 @@ +package main + +import ( + "fmt" + "sync" + "sync/atomic" + "time" +) + +// Counter represents a thread-safe counter +type Counter struct { + mu sync.Mutex + value int64 +} + +// Increment increases the counter value by 1 +func (c *Counter) Increment() { + c.mu.Lock() + defer c.mu.Unlock() + c.value++ +} + +// GetValue returns the current value of the counter +func (c *Counter) GetValue() int64 { + c.mu.Lock() + defer c.mu.Unlock() + return c.value +} + +// Constant values for the test +const ( + numGoroutines = 64 + numIterations = 10000 + expectedTotal = numGoroutines * numIterations +) + +func main() { + // Create a new counter instance + counter := &Counter{} + + // Create a wait group to wait for all goroutines to finish + var wg sync.WaitGroup + + // Track active goroutines for monitoring + var activeGoroutines int32 + + // Start time + startTime := time.Now() + + // Launch goroutines + for i := 0; i < numGoroutines; i++ { + wg.Add(1) + atomic.AddInt32(&activeGoroutines, 1) + + go func(id int) { + defer func() { + wg.Done() + atomic.AddInt32(&activeGoroutines, -1) + }() + + // Each goroutine increments the counter numIterations times + for j := 0; j < numIterations; j++ { + counter.Increment() + + // Simulate varying workload with random sleeps + if j%100 == 0 { + time.Sleep(time.Microsecond) + } + } + fmt.Printf("Goroutine %d finished\n", id) + }(i) + } + + // Monitor progress in a separate goroutine + go func() { + for { + active := atomic.LoadInt32(&activeGoroutines) + if active == 0 { + break + } + fmt.Printf("Active goroutines: %d\n", active) + time.Sleep(time.Second) + } + }() + + // Wait for all goroutines to complete + wg.Wait() + + // Calculate execution time + duration := time.Since(startTime) + + // Get and verify the final result + finalValue := counter.GetValue() + fmt.Printf("\nExecution completed in: %v\n", duration) + fmt.Printf("Final counter value: %d\n", finalValue) + fmt.Printf("Expected value: %d\n", expectedTotal) + + // Assert the result + if finalValue != expectedTotal { + panic(fmt.Sprintf("ERROR: Counter value mismatch! Expected %d, got %d\n", + expectedTotal, finalValue)) + } else { + fmt.Printf("SUCCESS: Counter value matches expected total\n") + } + + // Print some statistics + opsPerSecond := float64(expectedTotal) / duration.Seconds() + fmt.Printf("\nStatistics:\n") + fmt.Printf("Operations per second: %.2f\n", opsPerSecond) + fmt.Printf("Average time per operation: %.2f ns\n", + float64(duration.Nanoseconds())/float64(expectedTotal)) +} diff --git a/runtime/internal/clite/pthread/sync/sync.go b/runtime/internal/clite/pthread/sync/sync.go index ececfd75..c74d3214 100644 --- a/runtime/internal/clite/pthread/sync/sync.go +++ b/runtime/internal/clite/pthread/sync/sync.go @@ -142,11 +142,11 @@ func (a *CondAttr) Init(attr *CondAttr) c.Int { return 0 } // llgo:link (*CondAttr).Destroy C.pthread_condattr_destroy func (a *CondAttr) Destroy() {} -// llgo:link (*CondAttr).SetClock C.pthread_condattr_setclock -func (a *CondAttr) SetClock(clock time.ClockidT) c.Int { return 0 } +// // llgo:link (*CondAttr).SetClock C.pthread_condattr_setclock +// func (a *CondAttr) SetClock(clock time.ClockidT) c.Int { return 0 } -// llgo:link (*CondAttr).GetClock C.pthread_condattr_getclock -func (a *CondAttr) GetClock(clock *time.ClockidT) c.Int { return 0 } +// // llgo:link (*CondAttr).GetClock C.pthread_condattr_getclock +// func (a *CondAttr) GetClock(clock *time.ClockidT) c.Int { return 0 } // ----------------------------------------------------------------------------- diff --git a/runtime/internal/lib/sync/sync.go b/runtime/internal/lib/sync/sync.go index e7c250be..07cdec69 100644 --- a/runtime/internal/lib/sync/sync.go +++ b/runtime/internal/lib/sync/sync.go @@ -17,32 +17,49 @@ package sync import ( + "runtime" gosync "sync" - "unsafe" - c "github.com/goplus/llgo/runtime/internal/clite" "github.com/goplus/llgo/runtime/internal/clite/pthread/sync" + "github.com/goplus/llgo/runtime/internal/lib/sync/atomic" ) // llgo:skipall type _sync struct{} +const ( + uninited = 0 + initializing = 1 + inited = 2 +) + // ----------------------------------------------------------------------------- -type Mutex sync.Mutex +type Mutex struct { + sync.Mutex + init int32 +} + +func (m *Mutex) ensureInit() { + for atomic.LoadInt32(&m.init) != inited { + if atomic.CompareAndSwapInt32(&m.init, uninited, initializing) { + (*sync.Mutex)(&m.Mutex).Init(nil) + atomic.StoreInt32(&m.init, inited) + runtime.SetFinalizer(m, func(m *Mutex) { + m.Mutex.Destroy() + }) + } + } +} func (m *Mutex) Lock() { - if *(*c.Long)(unsafe.Pointer(m)) == 0 { - (*sync.Mutex)(m).Init(nil) // TODO(xsw): finalize - } - (*sync.Mutex)(m).Lock() + m.ensureInit() + (*sync.Mutex)(&m.Mutex).Lock() } func (m *Mutex) TryLock() bool { - if *(*c.Long)(unsafe.Pointer(m)) == 0 { - (*sync.Mutex)(m).Init(nil) - } - return (*sync.Mutex)(m).TryLock() == 0 + m.ensureInit() + return (*sync.Mutex)(&m.Mutex).TryLock() == 0 } // llgo:link (*Mutex).Unlock C.pthread_mutex_unlock @@ -50,37 +67,44 @@ func (m *Mutex) Unlock() {} // ----------------------------------------------------------------------------- -type RWMutex sync.RWLock +type RWMutex struct { + sync.RWLock + init int32 +} + +func (m *RWMutex) ensureInit() { + for atomic.LoadInt32(&m.init) != inited { + if atomic.CompareAndSwapInt32(&m.init, uninited, initializing) { + (*sync.RWLock)(&m.RWLock).Init(nil) + atomic.StoreInt32(&m.init, inited) + runtime.SetFinalizer(m, func(m *RWMutex) { + m.RWLock.Destroy() + }) + } + } +} func (rw *RWMutex) RLock() { - if *(*c.Long)(unsafe.Pointer(rw)) == 0 { - (*sync.RWLock)(rw).Init(nil) - } - (*sync.RWLock)(rw).RLock() + rw.ensureInit() + (*sync.RWLock)(&rw.RWLock).RLock() } func (rw *RWMutex) TryRLock() bool { - if *(*c.Long)(unsafe.Pointer(rw)) == 0 { - (*sync.RWLock)(rw).Init(nil) - } - return (*sync.RWLock)(rw).TryRLock() == 0 + rw.ensureInit() + return (*sync.RWLock)(&rw.RWLock).TryRLock() == 0 } // llgo:link (*RWMutex).RUnlock C.pthread_rwlock_unlock func (rw *RWMutex) RUnlock() {} func (rw *RWMutex) Lock() { - if *(*c.Long)(unsafe.Pointer(rw)) == 0 { - (*sync.RWLock)(rw).Init(nil) - } - (*sync.RWLock)(rw).Lock() + rw.ensureInit() + (*sync.RWLock)(&rw.RWLock).Lock() } func (rw *RWMutex) TryLock() bool { - if *(*c.Long)(unsafe.Pointer(rw)) == 0 { - (*sync.RWLock)(rw).Init(nil) - } - return (*sync.RWLock)(rw).TryLock() == 0 + rw.ensureInit() + return (*sync.RWLock)(&rw.RWLock).TryLock() == 0 } // llgo:link (*RWMutex).Unlock C.pthread_rwlock_unlock @@ -113,7 +137,10 @@ type Cond struct { func NewCond(l gosync.Locker) *Cond { ret := &Cond{m: l.(*sync.Mutex)} - ret.cond.Init(nil) // TODO(xsw): finalize + ret.cond.Init(nil) + runtime.SetFinalizer(ret, func(ret *Cond) { + ret.cond.Destroy() + }) return ret } @@ -133,17 +160,29 @@ type WaitGroup struct { mutex sync.Mutex cond sync.Cond count int + init int32 +} + +func (wg *WaitGroup) ensureInit() { + for atomic.LoadInt32(&wg.init) != inited { + if atomic.CompareAndSwapInt32(&wg.init, uninited, initializing) { + wg.doInit() + atomic.StoreInt32(&wg.init, inited) + } + } } func (wg *WaitGroup) doInit() { wg.mutex.Init(nil) - wg.cond.Init(nil) // TODO(xsw): finalize + wg.cond.Init(nil) + runtime.SetFinalizer(wg, func(wg *WaitGroup) { + wg.cond.Destroy() + wg.mutex.Destroy() + }) } func (wg *WaitGroup) Add(delta int) { - if *(*c.Long)(unsafe.Pointer(wg)) == 0 { - wg.doInit() - } + wg.ensureInit() wg.mutex.Lock() wg.count += delta if wg.count <= 0 { @@ -157,9 +196,7 @@ func (wg *WaitGroup) Done() { } func (wg *WaitGroup) Wait() { - if *(*c.Long)(unsafe.Pointer(wg)) == 0 { - wg.doInit() - } + wg.ensureInit() wg.mutex.Lock() for wg.count > 0 { wg.cond.Wait(&wg.mutex) diff --git a/runtime/internal/runtime/z_face.go b/runtime/internal/runtime/z_face.go index 7f1c61c0..af7d10de 100644 --- a/runtime/internal/runtime/z_face.go +++ b/runtime/internal/runtime/z_face.go @@ -312,12 +312,13 @@ var itabTable struct { entries []*Itab } +func init() { + (*sync.Mutex)(&itabTable.mutex).Init(nil) +} + type mutex sync.Mutex func (m *mutex) Lock() { - if *(*c.Long)(unsafe.Pointer(m)) == 0 { - (*sync.Mutex)(m).Init(nil) - } (*sync.Mutex)(m).Lock() }