diff --git a/cmd/gluetun/main.go b/cmd/gluetun/main.go index 66c10b34..2ad225f7 100644 --- a/cmd/gluetun/main.go +++ b/cmd/gluetun/main.go @@ -271,7 +271,7 @@ func _main(background context.Context, buildInfo models.BuildInformation, go shadowsocksLooper.Run(ctx, wg) if allSettings.HTTPProxy.Enabled { - httpProxyLooper.Restart() + _, _ = httpProxyLooper.SetStatus(constants.Running) } if allSettings.ShadowSocks.Enabled { restartShadowsocks() diff --git a/internal/httpproxy/loop.go b/internal/httpproxy/loop.go index dec31ba8..a3b8e39f 100644 --- a/internal/httpproxy/loop.go +++ b/internal/httpproxy/loop.go @@ -4,109 +4,83 @@ import ( "context" "fmt" "sync" + "time" + "github.com/qdm12/gluetun/internal/constants" + "github.com/qdm12/gluetun/internal/models" "github.com/qdm12/gluetun/internal/settings" "github.com/qdm12/golibs/logging" ) type Looper interface { Run(ctx context.Context, wg *sync.WaitGroup) - Restart() - Start() - Stop() + SetStatus(status models.LoopStatus) (outcome string, err error) + GetStatus() (status models.LoopStatus) GetSettings() (settings settings.HTTPProxy) - SetSettings(settings settings.HTTPProxy) + SetSettings(settings settings.HTTPProxy) (outcome string) } type looper struct { - settings settings.HTTPProxy - settingsMutex sync.RWMutex - logger logging.Logger - restart chan struct{} + state state + // Other objects + logger logging.Logger + // Internal channels and locks + loopLock sync.Mutex + running chan models.LoopStatus + stop, stopped chan struct{} start chan struct{} - stop chan struct{} + backoffTime time.Duration } +const defaultBackoffTime = 10 * time.Second + func NewLooper(logger logging.Logger, settings settings.HTTPProxy) Looper { return &looper{ - settings: settings, - logger: logger.WithPrefix("http proxy: "), - restart: make(chan struct{}), - start: make(chan struct{}), - stop: make(chan struct{}), + state: state{ + status: constants.Stopped, + settings: settings, + }, + logger: logger.WithPrefix("http proxy: "), + start: make(chan struct{}), + running: make(chan models.LoopStatus), + stop: make(chan struct{}), + stopped: make(chan struct{}), + backoffTime: defaultBackoffTime, } } -func (l *looper) GetSettings() (settings settings.HTTPProxy) { - l.settingsMutex.RLock() - defer l.settingsMutex.RUnlock() - return l.settings -} - -func (l *looper) SetSettings(settings settings.HTTPProxy) { - l.settingsMutex.Lock() - defer l.settingsMutex.Unlock() - l.settings = settings -} - -func (l *looper) isEnabled() bool { - l.settingsMutex.RLock() - defer l.settingsMutex.RUnlock() - return l.settings.Enabled -} - -func (l *looper) setEnabled(enabled bool) { - l.settingsMutex.Lock() - defer l.settingsMutex.Unlock() - l.settings.Enabled = enabled -} - -func (l *looper) Restart() { l.restart <- struct{}{} } -func (l *looper) Start() { l.start <- struct{}{} } -func (l *looper) Stop() { l.stop <- struct{}{} } - func (l *looper) Run(ctx context.Context, wg *sync.WaitGroup) { defer wg.Done() - waitForStart := true - for waitForStart { - select { - case <-l.stop: - l.logger.Info("not started yet") - case <-l.start: - waitForStart = false - case <-l.restart: - waitForStart = false - case <-ctx.Done(): - return - } + + crashed := false + + select { + case <-l.start: + case <-ctx.Done(): + return } + defer l.logger.Warn("loop exited") for ctx.Err() == nil { - for !l.isEnabled() { - // wait for a signal to re-enable - select { - case <-l.stop: - l.logger.Info("already disabled") - case <-l.restart: - l.setEnabled(true) - case <-l.start: - l.setEnabled(true) - case <-ctx.Done(): - return - } - } + runCtx, runCancel := context.WithCancel(ctx) settings := l.GetSettings() - address := fmt.Sprintf("0.0.0.0:%d", settings.Port) + address := fmt.Sprintf(":%d", settings.Port) + server := New(runCtx, address, l.logger, settings.Stealth, settings.Log, settings.User, settings.Password) - server := New(ctx, address, l.logger, settings.Stealth, settings.Log, settings.User, settings.Password) - - runCtx, runCancel := context.WithCancel(context.Background()) runWg := &sync.WaitGroup{} runWg.Add(1) - // TODO crashed channel - go server.Run(runCtx, runWg) + errorCh := make(chan error) + go server.Run(runCtx, runWg, errorCh) + + if !crashed { + l.running <- constants.Running + crashed = false + } else { + l.backoffTime = defaultBackoffTime + l.state.setStatusWithLock(constants.Running) + } stayHere := true for stayHere { @@ -116,21 +90,38 @@ func (l *looper) Run(ctx context.Context, wg *sync.WaitGroup) { runCancel() runWg.Wait() return - case <-l.restart: // triggered restart - l.logger.Info("restarting") + case <-l.start: + l.logger.Info("starting") runCancel() runWg.Wait() stayHere = false - case <-l.start: - l.logger.Info("already started") case <-l.stop: l.logger.Info("stopping") runCancel() runWg.Wait() - l.setEnabled(false) + l.stopped <- struct{}{} + case err := <-errorCh: + runWg.Wait() + l.state.setStatusWithLock(constants.Crashed) + l.logAndWait(ctx, err) + crashed = true stayHere = false } } runCancel() // repetition for linter only } } + +func (l *looper) logAndWait(ctx context.Context, err error) { + l.logger.Error(err) + l.logger.Info("retrying in %s", l.backoffTime) + timer := time.NewTimer(l.backoffTime) + l.backoffTime *= 2 + select { + case <-timer.C: + case <-ctx.Done(): + if !timer.Stop() { + <-timer.C + } + } +} diff --git a/internal/httpproxy/server.go b/internal/httpproxy/server.go index 955009bc..0942d007 100644 --- a/internal/httpproxy/server.go +++ b/internal/httpproxy/server.go @@ -10,7 +10,7 @@ import ( ) type Server interface { - Run(ctx context.Context, wg *sync.WaitGroup) + Run(ctx context.Context, wg *sync.WaitGroup, errorCh chan<- error) } type server struct { @@ -31,13 +31,13 @@ func New(ctx context.Context, address string, logger logging.Logger, } } -func (s *server) Run(ctx context.Context, wg *sync.WaitGroup) { +func (s *server) Run(ctx context.Context, wg *sync.WaitGroup, errorCh chan<- error) { defer wg.Done() server := http.Server{Addr: s.address, Handler: s.handler} go func() { <-ctx.Done() - s.logger.Warn("context canceled: exiting loop") - defer s.logger.Warn("loop exited") + s.logger.Warn("shutting down server") + defer s.logger.Warn("server shut down") const shutdownGraceDuration = 2 * time.Second shutdownCtx, cancel := context.WithTimeout(context.Background(), shutdownGraceDuration) defer cancel() @@ -47,8 +47,8 @@ func (s *server) Run(ctx context.Context, wg *sync.WaitGroup) { }() s.logger.Info("listening on %s", s.address) err := server.ListenAndServe() - if err != nil && ctx.Err() != context.Canceled { - s.logger.Error(err) + if err != nil && ctx.Err() == nil { + errorCh <- err } s.internalWG.Wait() } diff --git a/internal/httpproxy/state.go b/internal/httpproxy/state.go new file mode 100644 index 00000000..567e4547 --- /dev/null +++ b/internal/httpproxy/state.go @@ -0,0 +1,101 @@ +package httpproxy + +import ( + "fmt" + "reflect" + "sync" + + "github.com/qdm12/gluetun/internal/constants" + "github.com/qdm12/gluetun/internal/models" + "github.com/qdm12/gluetun/internal/settings" +) + +type state struct { + status models.LoopStatus + settings settings.HTTPProxy + statusMu sync.RWMutex + settingsMu sync.RWMutex +} + +func (s *state) setStatusWithLock(status models.LoopStatus) { + s.statusMu.Lock() + defer s.statusMu.Unlock() + s.status = status +} + +func (l *looper) GetStatus() (status models.LoopStatus) { + l.state.statusMu.RLock() + defer l.state.statusMu.RUnlock() + return l.state.status +} + +func (l *looper) SetStatus(status models.LoopStatus) (outcome string, err error) { + l.state.statusMu.Lock() + defer l.state.statusMu.Unlock() + existingStatus := l.state.status + + switch status { + case constants.Running: + switch existingStatus { + case constants.Starting, constants.Running, constants.Stopping, constants.Crashed: + return fmt.Sprintf("already %s", existingStatus), nil + } + l.loopLock.Lock() + defer l.loopLock.Unlock() + l.state.status = constants.Starting + l.state.statusMu.Unlock() + l.start <- struct{}{} + newStatus := <-l.running + l.state.statusMu.Lock() + l.state.status = newStatus + return newStatus.String(), nil + case constants.Stopped: + switch existingStatus { + case constants.Stopped, constants.Stopping, constants.Starting, constants.Crashed: + return fmt.Sprintf("already %s", existingStatus), nil + } + l.loopLock.Lock() + defer l.loopLock.Unlock() + l.state.status = constants.Stopping + l.state.statusMu.Unlock() + l.stop <- struct{}{} + <-l.stopped + l.state.statusMu.Lock() + l.state.status = status + return status.String(), nil + default: + return "", fmt.Errorf("status %q can only be %q or %q", + status, constants.Running, constants.Stopped) + } +} + +func (l *looper) GetSettings() (settings settings.HTTPProxy) { + l.state.settingsMu.RLock() + defer l.state.settingsMu.RUnlock() + return l.state.settings +} + +func (l *looper) SetSettings(settings settings.HTTPProxy) (outcome string) { + l.state.settingsMu.Lock() + settingsUnchanged := reflect.DeepEqual(settings, l.state.settings) + if settingsUnchanged { + l.state.settingsMu.Unlock() + return "settings left unchanged" + } + newEnabled := settings.Enabled + previousEnabled := l.state.settings.Enabled + l.state.settings = settings + l.state.settingsMu.Unlock() + // Either restart or set changed status + switch { + case !newEnabled && !previousEnabled: + case newEnabled && previousEnabled: + _, _ = l.SetStatus(constants.Stopped) + _, _ = l.SetStatus(constants.Running) + case newEnabled && !previousEnabled: + _, _ = l.SetStatus(constants.Running) + case !newEnabled && previousEnabled: + _, _ = l.SetStatus(constants.Stopped) + } + return "settings updated" +}