From 240345d527cffd806b325f5e77154b166feef482 Mon Sep 17 00:00:00 2001 From: Bradley Cicenas Date: Tue, 27 Jun 2017 16:21:16 +0000 Subject: [PATCH] add StreamLogs() to collector interface --- connector/collector/docker.go | 35 +++++++++++++++++++++++++++++++++++ connector/collector/main.go | 9 +++++++++ connector/collector/mock.go | 13 +++++++++++++ connector/collector/runc.go | 5 +++++ container/main.go | 5 +++-- models/main.go | 7 ------- 6 files changed, 65 insertions(+), 9 deletions(-) diff --git a/connector/collector/docker.go b/connector/collector/docker.go index 03c18de..9f13f76 100644 --- a/connector/collector/docker.go +++ b/connector/collector/docker.go @@ -1,6 +1,9 @@ package collector import ( + "bufio" + "io" + "github.com/bcicen/ctop/models" api "github.com/fsouza/go-dockerclient" ) @@ -65,6 +68,31 @@ func (c *Docker) Stream() chan models.Metrics { return c.stream } +func (c *Docker) StreamLogs() (chan string, error) { + r, w := io.Pipe() + logCh := make(chan string) + + opts := api.LogsOptions{ + Container: c.id, + OutputStream: w, + ErrorStream: w, + Stdout: true, + Stderr: true, + Tail: "10", + Follow: true, + Timestamps: true, + } + + go tailLogs(r, logCh) + go func() { + err := c.client.Logs(opts) + if err != nil { + log.Errorf("error reading container logs: %s", err) + } + }() + return logCh, nil +} + // Stop collector func (c *Docker) Stop() { c.done <- true @@ -111,3 +139,10 @@ func (c *Docker) ReadIO(stats *api.Stats) { } c.IOBytesRead, c.IOBytesWrite = read, write } + +func tailLogs(reader io.Reader, ch chan string) { + scanner := bufio.NewScanner(reader) + for scanner.Scan() { + ch <- scanner.Text() + } +} diff --git a/connector/collector/main.go b/connector/collector/main.go index c7b98be..a2b8fcb 100644 --- a/connector/collector/main.go +++ b/connector/collector/main.go @@ -4,10 +4,19 @@ import ( "math" "github.com/bcicen/ctop/logging" + "github.com/bcicen/ctop/models" ) var log = logging.Init() +type Collector interface { + Stream() chan models.Metrics + StreamLogs() (chan string, error) + Running() bool + Start() + Stop() +} + func round(num float64) int { return int(num + math.Copysign(0.5, num)) } diff --git a/connector/collector/mock.go b/connector/collector/mock.go index 972e6f5..7376a71 100644 --- a/connector/collector/mock.go +++ b/connector/collector/mock.go @@ -9,6 +9,8 @@ import ( "github.com/bcicen/ctop/models" ) +const mockLog = "Cura ob pro qui tibi inveni dum qua fit donec amare illic mea, regem falli contexo pro peregrinorum heremo absconditi araneae meminerim deliciosas actionibus facere modico dura sonuerunt psalmi contra rerum, tempus mala anima volebant dura quae o modis." + // Mock collector type Mock struct { models.Metrics @@ -45,6 +47,17 @@ func (c *Mock) Stream() chan models.Metrics { return c.stream } +func (c *Mock) StreamLogs() (chan string, error) { + logCh := make(chan string) + go func() { + for { + logCh <- mockLog + time.Sleep(250 * time.Millisecond) + } + }() + return logCh, nil +} + func (c *Mock) run() { c.running = true rand.Seed(int64(time.Now().Nanosecond())) diff --git a/connector/collector/runc.go b/connector/collector/runc.go index 905c018..0bb5f67 100644 --- a/connector/collector/runc.go +++ b/connector/collector/runc.go @@ -3,6 +3,7 @@ package collector import ( + "fmt" "time" "github.com/bcicen/ctop/models" @@ -51,6 +52,10 @@ func (c *Runc) Stream() chan models.Metrics { return c.stream } +func (c *Runc) StreamLogs() (chan string, error) { + return nil, fmt.Errorf("log streaming unavailable for runc collector") +} + func (c *Runc) run() { c.running = true defer close(c.stream) diff --git a/container/main.go b/container/main.go index 4f004b6..3442f7d 100644 --- a/container/main.go +++ b/container/main.go @@ -1,6 +1,7 @@ package container import ( + "github.com/bcicen/ctop/connector/collector" "github.com/bcicen/ctop/cwidgets" "github.com/bcicen/ctop/cwidgets/compact" "github.com/bcicen/ctop/logging" @@ -19,10 +20,10 @@ type Container struct { Widgets *compact.Compact Display bool // display this container in compact view updater cwidgets.WidgetUpdater - collector models.Collector + collector collector.Collector } -func New(id string, collector models.Collector) *Container { +func New(id string, collector collector.Collector) *Container { widgets := compact.NewCompact(id) return &Container{ Metrics: models.NewMetrics(), diff --git a/models/main.go b/models/main.go index fb1ddc3..ff20a6a 100644 --- a/models/main.go +++ b/models/main.go @@ -24,10 +24,3 @@ func NewMetrics() Metrics { Pids: -1, } } - -type Collector interface { - Stream() chan Metrics - Running() bool - Start() - Stop() -}