ctop/dockersource.go
2017-02-28 09:35:33 +11:00

175 lines
3.8 KiB
Go

package main
import (
"sort"
"sync"
"time"
"github.com/bcicen/ctop/config"
"github.com/bcicen/ctop/metrics"
"github.com/fsouza/go-dockerclient"
)
var lock = sync.RWMutex{}
type ContainerSource interface {
All() []*Container
Get(string) (*Container, bool)
}
type DockerContainerSource struct {
client *docker.Client
containers Containers
collectors map[string]metrics.Collector
needsRefresh map[string]int // container IDs requiring refresh
}
func NewDockerContainerSource() *DockerContainerSource {
// init docker client
client, err := docker.NewClient(config.GetVal("dockerHost"))
if err != nil {
panic(err)
}
cm := &DockerContainerSource{
client: client,
collectors: make(map[string]metrics.Collector),
needsRefresh: make(map[string]int),
}
cm.refreshAll()
go cm.Loop()
go cm.watchEvents()
return cm
}
// Docker events watcher
func (cm *DockerContainerSource) watchEvents() {
log.Info("docker event listener starting")
events := make(chan *docker.APIEvents)
cm.client.AddEventListener(events)
for e := range events {
if e.Type != "container" {
continue
}
switch e.Action {
case "start", "die", "pause", "unpause":
log.Debugf("handling docker event: action=%s id=%s", e.Action, e.ID)
cm.needsRefresh[e.ID] = 1
case "destroy":
log.Debugf("handling docker event: action=%s id=%s", e.Action, e.ID)
cm.delByID(e.ID)
}
}
}
func (cm *DockerContainerSource) refresh(id string) {
insp := cm.inspect(id)
// remove container if no longer exists
if insp == nil {
cm.delByID(id)
return
}
c, ok := cm.Get(id)
// append container struct for new containers
if !ok {
c = NewContainer(id, insp.Name)
lock.Lock()
cm.containers = append(cm.containers, c)
lock.Unlock()
// create collector
if _, ok := cm.collectors[id]; ok == false {
cm.collectors[id] = metrics.NewDocker(cm.client, id)
}
}
c.SetState(insp.State.Status)
// start collector if needed
if c.state == "running" && !cm.collectors[c.id].Running() {
cm.collectors[c.id].Start()
c.Read(cm.collectors[c.id].Stream())
}
// stop collector if needed
if c.state != "running" && cm.collectors[c.id].Running() {
cm.collectors[c.id].Stop()
}
}
func (cm *DockerContainerSource) inspect(id string) *docker.Container {
c, err := cm.client.InspectContainer(id)
if err != nil {
if _, ok := err.(*docker.NoSuchContainer); ok == false {
log.Errorf(err.Error())
}
}
return c
}
// Mark all container IDs for refresh
func (cm *DockerContainerSource) refreshAll() {
opts := docker.ListContainersOptions{All: true}
allContainers, err := cm.client.ListContainers(opts)
if err != nil {
panic(err)
}
for _, c := range allContainers {
cm.needsRefresh[c.ID] = 1
}
}
func (cm *DockerContainerSource) Loop() {
for {
switch {
case len(cm.needsRefresh) > 0:
processed := []string{}
for id, _ := range cm.needsRefresh {
cm.refresh(id)
processed = append(processed, id)
}
for _, id := range processed {
delete(cm.needsRefresh, id)
}
default:
time.Sleep(3 * time.Second)
}
}
}
// Get a single container, by ID
func (cm *DockerContainerSource) Get(id string) (*Container, bool) {
for _, c := range cm.containers {
if c.id == id {
return c, true
}
}
return nil, false
}
// Remove containers by ID
func (cm *DockerContainerSource) delByID(id string) {
for n, c := range cm.containers {
if c.id == id {
cm.del(n)
return
}
}
}
// Remove one or more containers by index
func (cm *DockerContainerSource) del(idx ...int) {
lock.Lock()
defer lock.Unlock()
for _, i := range idx {
cm.containers = append(cm.containers[:i], cm.containers[i+1:]...)
}
log.Infof("removed %d dead containers", len(idx))
}
// Return array of all containers, sorted by field
func (cm *DockerContainerSource) All() []*Container {
sort.Sort(cm.containers)
return cm.containers
}