chore(portforward): improve loop reliability
- handle settings update within run function - signal back start result to update call - update loop settings only when service is started
This commit is contained in:
@@ -27,8 +27,9 @@ type Loop struct {
|
||||
// when performing an update
|
||||
runCtx context.Context //nolint:containedctx
|
||||
runCancel context.CancelFunc
|
||||
updatedSignal chan<- struct{}
|
||||
runDone <-chan struct{}
|
||||
updateTrigger chan<- service.Settings
|
||||
updatedResult <-chan error
|
||||
}
|
||||
|
||||
func NewLoop(settings settings.PortForwarding, routing Routing,
|
||||
@@ -52,26 +53,39 @@ func (l *Loop) Start(_ context.Context) (runError <-chan error, _ error) {
|
||||
runDone := make(chan struct{})
|
||||
l.runDone = runDone
|
||||
|
||||
updatedSignal := make(chan struct{})
|
||||
l.updatedSignal = updatedSignal
|
||||
updateTrigger := make(chan service.Settings)
|
||||
l.updateTrigger = updateTrigger
|
||||
updateResult := make(chan error)
|
||||
l.updatedResult = updateResult
|
||||
runErrorCh := make(chan error)
|
||||
|
||||
go l.run(l.runCtx, runDone, runErrorCh, updatedSignal)
|
||||
go l.run(l.runCtx, runDone, runErrorCh,
|
||||
l.settings, updateTrigger, updateResult)
|
||||
|
||||
return runErrorCh, nil
|
||||
}
|
||||
|
||||
func (l *Loop) run(runCtx context.Context, runDone chan<- struct{},
|
||||
runErrorCh chan<- error, updatedSignal <-chan struct{}) {
|
||||
runErrorCh chan<- error, initialSettings service.Settings,
|
||||
updateTrigger <-chan service.Settings, updateResult chan<- error) {
|
||||
defer close(runDone)
|
||||
|
||||
settings := initialSettings
|
||||
var serviceRunError <-chan error
|
||||
for {
|
||||
updateReceived := false
|
||||
select {
|
||||
case <-runCtx.Done():
|
||||
// Stop call takes care of stopping the service
|
||||
return
|
||||
case <-updatedSignal: // first and subsequent start trigger
|
||||
case partialUpdate := <-updateTrigger:
|
||||
updatedSettings, err := settings.UpdateWith(partialUpdate)
|
||||
if err != nil {
|
||||
updateResult <- err
|
||||
continue
|
||||
}
|
||||
settings = updatedSettings
|
||||
updateReceived = true
|
||||
case err := <-serviceRunError:
|
||||
l.logger.Error(err.Error())
|
||||
}
|
||||
@@ -85,37 +99,40 @@ func (l *Loop) run(runCtx context.Context, runDone chan<- struct{},
|
||||
}
|
||||
}
|
||||
|
||||
l.settingsMutex.RLock()
|
||||
l.service = service.New(l.settings, l.routing, l.client,
|
||||
l.service = service.New(settings, l.routing, l.client,
|
||||
l.portAllower, l.logger, l.uid, l.gid)
|
||||
l.settingsMutex.RUnlock()
|
||||
|
||||
var err error
|
||||
serviceRunError, err = l.service.Start(runCtx)
|
||||
if updateReceived {
|
||||
// Signal to the Update call that the service has started
|
||||
// and if it failed to start.
|
||||
updateResult <- err
|
||||
}
|
||||
if err != nil {
|
||||
if runCtx.Err() == nil { // crashed but NOT stopped
|
||||
runErrorCh <- fmt.Errorf("starting new service: %w", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Service is created and started successfully, so update
|
||||
// the settings for external calls such as GetSettings.
|
||||
l.settingsMutex.Lock()
|
||||
l.settings = settings
|
||||
l.settingsMutex.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
func (l *Loop) UpdateWith(partialUpdate service.Settings) (err error) {
|
||||
l.settingsMutex.Lock()
|
||||
l.settings, err = l.settings.UpdateWith(partialUpdate)
|
||||
l.settingsMutex.Unlock()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
select {
|
||||
case l.updatedSignal <- struct{}{}:
|
||||
// Settings are validated and if the service fails to start
|
||||
// or crashes at runtime, the loop will stop and signal its
|
||||
// parent goroutine. Settings validation should be the only
|
||||
// error feedback for the caller of `Update`.
|
||||
return nil
|
||||
case l.updateTrigger <- partialUpdate:
|
||||
select {
|
||||
case err = <-l.updatedResult:
|
||||
return err
|
||||
case <-l.runCtx.Done():
|
||||
return l.runCtx.Err()
|
||||
}
|
||||
case <-l.runCtx.Done():
|
||||
// loop has been stopped, no update can be done
|
||||
return l.runCtx.Err()
|
||||
|
||||
Reference in New Issue
Block a user