Maint: rework openvpn restart on unhealthy

This commit is contained in:
Quentin McGaw (desktop)
2021-07-18 03:17:48 +00:00
parent 7e50c95823
commit c0d27b4bfc
5 changed files with 59 additions and 74 deletions

View File

@@ -275,8 +275,6 @@ func _main(ctx context.Context, buildInfo models.BuildInformation,
} }
} // TODO move inside firewall? } // TODO move inside firewall?
healthy := make(chan bool)
// Shutdown settings // Shutdown settings
const defaultShutdownTimeout = 400 * time.Millisecond const defaultShutdownTimeout = 400 * time.Millisecond
defaultShutdownOnSuccess := func(goRoutineName string) { defaultShutdownOnSuccess := func(goRoutineName string) {
@@ -296,7 +294,7 @@ func _main(ctx context.Context, buildInfo models.BuildInformation,
otherGroupHandler := goshutdown.NewGroupHandler("other", defaultGroupSettings) otherGroupHandler := goshutdown.NewGroupHandler("other", defaultGroupSettings)
openvpnLooper := openvpn.NewLooper(allSettings.OpenVPN, nonRootUsername, puid, pgid, allServers, openvpnLooper := openvpn.NewLooper(allSettings.OpenVPN, nonRootUsername, puid, pgid, allServers,
ovpnConf, firewallConf, routingConf, logger, httpClient, os.OpenFile, tunnelReadyCh, healthy) ovpnConf, firewallConf, routingConf, logger, httpClient, os.OpenFile, tunnelReadyCh)
openvpnHandler, openvpnCtx, openvpnDone := goshutdown.NewGoRoutineHandler( openvpnHandler, openvpnCtx, openvpnDone := goshutdown.NewGoRoutineHandler(
"openvpn", goshutdown.GoRoutineSettings{Timeout: time.Second}) "openvpn", goshutdown.GoRoutineSettings{Timeout: time.Second})
// wait for restartOpenvpn // wait for restartOpenvpn
@@ -366,11 +364,12 @@ func _main(ctx context.Context, buildInfo models.BuildInformation,
go httpServer.Run(httpServerCtx, httpServerDone) go httpServer.Run(httpServerCtx, httpServerDone)
controlGroupHandler.Add(httpServerHandler) controlGroupHandler.Add(httpServerHandler)
healthcheckServer := healthcheck.NewServer(constants.HealthcheckAddress, healthLogger := logger.NewChild(logging.Settings{Prefix: "healthcheck: "})
logger.NewChild(logging.Settings{Prefix: "healthcheck: "})) healthcheckServer := healthcheck.NewServer(
constants.HealthcheckAddress, healthLogger, openvpnLooper)
healthServerHandler, healthServerCtx, healthServerDone := goshutdown.NewGoRoutineHandler( healthServerHandler, healthServerCtx, healthServerDone := goshutdown.NewGoRoutineHandler(
"HTTP health server", defaultGoRoutineSettings) "HTTP health server", defaultGoRoutineSettings)
go healthcheckServer.Run(healthServerCtx, healthy, healthServerDone) go healthcheckServer.Run(healthServerCtx, healthServerDone)
const orderShutdownTimeout = 3 * time.Second const orderShutdownTimeout = 3 * time.Second
orderSettings := goshutdown.OrderSettings{ orderSettings := goshutdown.OrderSettings{

View File

@@ -9,24 +9,24 @@ import (
"time" "time"
) )
func (s *server) runHealthcheckLoop(ctx context.Context, healthy chan<- bool, done chan<- struct{}) { func (s *server) runHealthcheckLoop(ctx context.Context, done chan<- struct{}) {
defer close(done) defer close(done)
s.openvpn.healthyTimer = time.NewTimer(defaultOpenvpnHealthyWaitTime)
for { for {
previousErr := s.handler.getErr() previousErr := s.handler.getErr()
err := healthCheck(ctx, s.resolver) err := healthCheck(ctx, s.resolver)
s.handler.setErr(err) s.handler.setErr(err)
// Notify the healthy channel, or not if it's already full
select {
case healthy <- err == nil:
default:
}
if previousErr != nil && err == nil { if previousErr != nil && err == nil {
s.logger.Info("healthy!") s.logger.Info("healthy!")
s.openvpn.healthyTimer.Stop()
s.openvpn.healthyWaitTime = defaultOpenvpnHealthyWaitTime
} else if previousErr == nil && err != nil { } else if previousErr == nil && err != nil {
s.logger.Info("unhealthy: " + err.Error()) s.logger.Info("unhealthy: " + err.Error())
s.openvpn.healthyTimer = time.NewTimer(s.openvpn.healthyWaitTime)
} }
if err != nil { // try again after 1 second if err != nil { // try again after 1 second
@@ -38,9 +38,12 @@ func (s *server) runHealthcheckLoop(ctx context.Context, healthy chan<- bool, do
} }
return return
case <-timer.C: case <-timer.C:
case <-s.openvpn.healthyTimer.C:
s.onUnhealthyOpenvpn(ctx)
} }
continue continue
} }
// Success, check again in 5 seconds // Success, check again in 5 seconds
const period = 5 * time.Second const period = 5 * time.Second
timer := time.NewTimer(period) timer := time.NewTimer(period)

View File

@@ -0,0 +1,17 @@
package healthcheck
import (
"context"
"time"
"github.com/qdm12/gluetun/internal/constants"
)
func (s *server) onUnhealthyOpenvpn(ctx context.Context) {
s.logger.Info("program has been unhealthy for " +
s.openvpn.healthyWaitTime.String() + ": restarting OpenVPN")
_, _ = s.openvpn.looper.ApplyStatus(ctx, constants.Stopped)
_, _ = s.openvpn.looper.ApplyStatus(ctx, constants.Running)
s.openvpn.healthyWaitTime += openvpnHealthyWaitTimeAdd
s.openvpn.healthyTimer = time.NewTimer(s.openvpn.healthyWaitTime)
}

View File

@@ -7,11 +7,12 @@ import (
"net/http" "net/http"
"time" "time"
"github.com/qdm12/gluetun/internal/openvpn"
"github.com/qdm12/golibs/logging" "github.com/qdm12/golibs/logging"
) )
type Server interface { type Server interface {
Run(ctx context.Context, healthy chan<- bool, done chan<- struct{}) Run(ctx context.Context, done chan<- struct{})
} }
type server struct { type server struct {
@@ -19,22 +20,40 @@ type server struct {
logger logging.Logger logger logging.Logger
handler *handler handler *handler
resolver *net.Resolver resolver *net.Resolver
openvpn openvpnHealth
} }
func NewServer(address string, logger logging.Logger) Server { type openvpnHealth struct {
looper openvpn.Looper
healthyWaitTime time.Duration
healthyTimer *time.Timer
}
const (
defaultOpenvpnHealthyWaitTime = 6 * time.Second
openvpnHealthyWaitTimeAdd = 5 * time.Second
)
func NewServer(address string, logger logging.Logger,
openvpnLooper openvpn.Looper) Server {
return &server{ return &server{
address: address, address: address,
logger: logger, logger: logger,
handler: newHandler(logger), handler: newHandler(logger),
resolver: net.DefaultResolver, resolver: net.DefaultResolver,
openvpn: openvpnHealth{
looper: openvpnLooper,
healthyWaitTime: defaultOpenvpnHealthyWaitTime,
},
} }
} }
func (s *server) Run(ctx context.Context, healthy chan<- bool, done chan<- struct{}) { func (s *server) Run(ctx context.Context, done chan<- struct{}) {
defer close(done) defer close(done)
s.logger.Debug("here 0")
loopDone := make(chan struct{}) loopDone := make(chan struct{})
go s.runHealthcheckLoop(ctx, healthy, loopDone) go s.runHealthcheckLoop(ctx, loopDone)
server := http.Server{ server := http.Server{
Addr: s.address, Addr: s.address,

View File

@@ -46,7 +46,6 @@ type looper struct {
client *http.Client client *http.Client
openFile os.OpenFileFunc openFile os.OpenFileFunc
tunnelReady chan<- struct{} tunnelReady chan<- struct{}
healthy <-chan bool
// Internal channels and values // Internal channels and values
stop <-chan struct{} stop <-chan struct{}
stopped chan<- struct{} stopped chan<- struct{}
@@ -55,20 +54,18 @@ type looper struct {
portForwardSignals chan net.IP portForwardSignals chan net.IP
userTrigger bool userTrigger bool
// Internal constant values // Internal constant values
backoffTime time.Duration backoffTime time.Duration
healthWaitTime time.Duration
} }
const ( const (
defaultBackoffTime = 15 * time.Second defaultBackoffTime = 15 * time.Second
defaultHealthWaitTime = 6 * time.Second
) )
func NewLooper(settings configuration.OpenVPN, func NewLooper(settings configuration.OpenVPN,
username string, puid, pgid int, allServers models.AllServers, username string, puid, pgid int, allServers models.AllServers,
conf Configurator, fw firewall.Configurator, routing routing.Routing, conf Configurator, fw firewall.Configurator, routing routing.Routing,
logger logging.ParentLogger, client *http.Client, openFile os.OpenFileFunc, logger logging.ParentLogger, client *http.Client, openFile os.OpenFileFunc,
tunnelReady chan<- struct{}, healthy <-chan bool) Looper { tunnelReady chan<- struct{}) Looper {
start := make(chan struct{}) start := make(chan struct{})
running := make(chan models.LoopStatus) running := make(chan models.LoopStatus)
stop := make(chan struct{}) stop := make(chan struct{})
@@ -90,7 +87,6 @@ func NewLooper(settings configuration.OpenVPN,
client: client, client: client,
openFile: openFile, openFile: openFile,
tunnelReady: tunnelReady, tunnelReady: tunnelReady,
healthy: healthy,
start: start, start: start,
running: running, running: running,
stop: stop, stop: stop,
@@ -98,7 +94,6 @@ func NewLooper(settings configuration.OpenVPN,
portForwardSignals: make(chan net.IP), portForwardSignals: make(chan net.IP),
userTrigger: true, userTrigger: true,
backoffTime: defaultBackoffTime, backoffTime: defaultBackoffTime,
healthWaitTime: defaultHealthWaitTime,
} }
} }
@@ -116,7 +111,7 @@ func (l *looper) signalOrSetStatus(status models.LoopStatus) {
} }
} }
func (l *looper) Run(ctx context.Context, done chan<- struct{}) { //nolint:gocognit func (l *looper) Run(ctx context.Context, done chan<- struct{}) {
defer close(done) defer close(done)
select { select {
@@ -243,25 +238,6 @@ func (l *looper) Run(ctx context.Context, done chan<- struct{}) { //nolint:gocog
stayHere = false stayHere = false
l.state.Unlock() l.state.Unlock()
case healthy := <-l.healthy:
if healthy {
continue
}
// ensure it stays unhealthy for some time before restarting it
healthy = l.waitForHealth(ctx)
if healthy || ctx.Err() != nil {
continue
}
l.logger.Warn("unhealthy program: restarting openvpn")
l.state.SetStatus(constants.Stopping)
openvpnCancel()
<-waitError
close(waitError)
closeStreams()
<-portForwardDone
l.state.SetStatus(constants.Stopped)
stayHere = false
} }
} }
openvpnCancel() openvpnCancel()
@@ -284,35 +260,6 @@ func (l *looper) logAndWait(ctx context.Context, err error) {
} }
} }
// waitForHealth waits for a true healthy signal
// after restarting openvpn in order to avoid restarting
// openvpn in a loop as it requires a few seconds to connect.
func (l *looper) waitForHealth(ctx context.Context) (healthy bool) {
l.logger.Info("unhealthy program: waiting %s for it to change to healthy", l.healthWaitTime)
timer := time.NewTimer(l.healthWaitTime)
l.healthWaitTime *= 2
for {
select {
case healthy = <-l.healthy:
if !healthy {
break
}
if !timer.Stop() {
<-timer.C
}
l.healthWaitTime = defaultHealthWaitTime
return true
case <-timer.C:
return false
case <-ctx.Done():
if !timer.Stop() {
<-timer.C
}
return false
}
}
}
// portForward is a blocking operation which may or may not be infinite. // portForward is a blocking operation which may or may not be infinite.
// You should therefore always call it in a goroutine. // You should therefore always call it in a goroutine.
func (l *looper) portForward(ctx context.Context, func (l *looper) portForward(ctx context.Context,