Merge pull request #229 from stokito/events

Improve docker events handling
This commit is contained in:
bradley 2020-11-19 11:27:09 -05:00 committed by GitHub
commit ddfff03c05
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -2,6 +2,7 @@ package connector
import ( import (
"fmt" "fmt"
"github.com/op/go-logging"
"strings" "strings"
"sync" "sync"
@ -13,10 +14,26 @@ import (
func init() { enabled["docker"] = NewDocker } func init() { enabled["docker"] = NewDocker }
var actionToStatus = map[string]string{
"create": "created",
"start": "running",
"die": "exited",
"stop": "exited",
"pause": "paused",
"unpause": "running",
}
type StatusUpdate struct {
Cid string
Field string // "status" or "health"
Status string
}
type Docker struct { type Docker struct {
client *api.Client client *api.Client
containers map[string]*container.Container containers map[string]*container.Container
needsRefresh chan string // container IDs requiring refresh needsRefresh chan string // container IDs requiring refresh
statuses chan StatusUpdate
closed chan struct{} closed chan struct{}
lock sync.RWMutex lock sync.RWMutex
} }
@ -31,6 +48,7 @@ func NewDocker() (Connector, error) {
client: client, client: client,
containers: make(map[string]*container.Container), containers: make(map[string]*container.Container),
needsRefresh: make(chan string, 60), needsRefresh: make(chan string, 60),
statuses: make(chan StatusUpdate, 60),
closed: make(chan struct{}), closed: make(chan struct{}),
lock: sync.RWMutex{}, lock: sync.RWMutex{},
} }
@ -48,6 +66,7 @@ func NewDocker() (Connector, error) {
log.Debugf("docker-connector ServerVersion: %s", info.ServerVersion) log.Debugf("docker-connector ServerVersion: %s", info.ServerVersion)
go cm.Loop() go cm.Loop()
go cm.LoopStatuses()
cm.refreshAll() cm.refreshAll()
go cm.watchEvents() go cm.watchEvents()
return cm, nil return cm, nil
@ -67,15 +86,45 @@ func (cm *Docker) watchEvents() {
continue continue
} }
actionName := strings.Split(e.Action, ":")[0] actionName := e.Action
// fast skip all exec_* events: exec_create, exec_start, exec_die
if strings.HasPrefix(actionName, "exec_") {
continue
}
// Action may have additional param i.e. "health_status: healthy"
// We need to strip to have only action name
sepIdx := strings.Index(actionName, ": ")
if sepIdx != -1 {
actionName = actionName[:sepIdx]
}
switch actionName { switch actionName {
case "start", "die", "pause", "unpause", "health_status": // most frequent event is a health checks
log.Debugf("handling docker event: action=%s id=%s", e.Action, e.ID) case "health_status":
healthStatus := e.Action[sepIdx+2:]
if log.IsEnabledFor(logging.DEBUG) {
log.Debugf("handling docker event: action=health_status id=%s %s", e.ID, healthStatus)
}
cm.statuses <- StatusUpdate{e.ID, "health", healthStatus}
case "create":
if log.IsEnabledFor(logging.DEBUG) {
log.Debugf("handling docker event: action=create id=%s", e.ID)
}
cm.needsRefresh <- e.ID cm.needsRefresh <- e.ID
case "destroy": case "destroy":
log.Debugf("handling docker event: action=%s id=%s", e.Action, e.ID) if log.IsEnabledFor(logging.DEBUG) {
log.Debugf("handling docker event: action=destroy id=%s", e.ID)
}
cm.delByID(e.ID) cm.delByID(e.ID)
default:
// check if this action changes status e.g. start -> running
status := actionToStatus[actionName]
if status != "" {
if log.IsEnabledFor(logging.DEBUG) {
log.Debugf("handling docker event: action=%s id=%s %s", actionName, e.ID, status)
}
cm.statuses <- StatusUpdate{e.ID, "status", status}
}
} }
} }
log.Info("docker event listener exited") log.Info("docker event listener exited")
@ -169,6 +218,24 @@ func (cm *Docker) Loop() {
} }
} }
func (cm *Docker) LoopStatuses() {
for {
select {
case statusUpdate := <-cm.statuses:
c, _ := cm.Get(statusUpdate.Cid)
if c != nil {
if statusUpdate.Field == "health" {
c.SetMeta("health", statusUpdate.Status)
} else {
c.SetState(statusUpdate.Status)
}
}
case <-cm.closed:
return
}
}
}
// MustGet gets a single container, creating one anew if not existing // MustGet gets a single container, creating one anew if not existing
func (cm *Docker) MustGet(id string) *container.Container { func (cm *Docker) MustGet(id string) *container.Container {
c, ok := cm.Get(id) c, ok := cm.Get(id)