add LogCollector interface, docker, mock log collectors

This commit is contained in:
Bradley Cicenas 2017-06-27 17:18:17 +00:00
parent 240345d527
commit 17e2c2df8e
7 changed files with 106 additions and 48 deletions

View File

@ -1,9 +1,6 @@
package collector
import (
"bufio"
"io"
"github.com/bcicen/ctop/models"
api "github.com/fsouza/go-dockerclient"
)
@ -68,29 +65,8 @@ func (c *Docker) Stream() chan models.Metrics {
return c.stream
}
func (c *Docker) StreamLogs() (chan string, error) {
r, w := io.Pipe()
logCh := make(chan string)
opts := api.LogsOptions{
Container: c.id,
OutputStream: w,
ErrorStream: w,
Stdout: true,
Stderr: true,
Tail: "10",
Follow: true,
Timestamps: true,
}
go tailLogs(r, logCh)
go func() {
err := c.client.Logs(opts)
if err != nil {
log.Errorf("error reading container logs: %s", err)
}
}()
return logCh, nil
func (c *Docker) Logs() LogCollector {
return &DockerLogs{c.id, c.client, make(chan bool)}
}
// Stop collector
@ -139,10 +115,3 @@ func (c *Docker) ReadIO(stats *api.Stats) {
}
c.IOBytesRead, c.IOBytesWrite = read, write
}
func tailLogs(reader io.Reader, ch chan string) {
scanner := bufio.NewScanner(reader)
for scanner.Scan() {
ch <- scanner.Text()
}
}

View File

@ -0,0 +1,60 @@
package collector
import (
"bufio"
"context"
"io"
api "github.com/fsouza/go-dockerclient"
)
type DockerLogs struct {
id string
client *api.Client
done chan bool
}
func (l *DockerLogs) Stream() chan string {
r, w := io.Pipe()
logCh := make(chan string)
ctx, cancel := context.WithCancel(context.Background())
opts := api.LogsOptions{
Context: ctx,
Container: l.id,
OutputStream: w,
ErrorStream: w,
Stdout: true,
Stderr: true,
Tail: "10",
Follow: true,
Timestamps: true,
}
// read io pipe into channel
go func() {
scanner := bufio.NewScanner(r)
for scanner.Scan() {
logCh <- scanner.Text()
}
}()
// connect to container log stream
go func() {
err := l.client.Logs(opts)
if err != nil {
log.Errorf("error reading container logs: %s", err)
}
}()
go func() {
select {
case <-l.done:
cancel()
}
}()
return logCh
}
func (l *DockerLogs) Stop() { l.done <- true }

View File

@ -9,9 +9,14 @@ import (
var log = logging.Init()
type LogCollector interface {
Stream() chan string
Stop()
}
type Collector interface {
Stream() chan models.Metrics
StreamLogs() (chan string, error)
Logs() LogCollector
Running() bool
Start()
Stop()

View File

@ -9,8 +9,6 @@ import (
"github.com/bcicen/ctop/models"
)
const mockLog = "Cura ob pro qui tibi inveni dum qua fit donec amare illic mea, regem falli contexo pro peregrinorum heremo absconditi araneae meminerim deliciosas actionibus facere modico dura sonuerunt psalmi contra rerum, tempus mala anima volebant dura quae o modis."
// Mock collector
type Mock struct {
models.Metrics
@ -47,15 +45,8 @@ func (c *Mock) Stream() chan models.Metrics {
return c.stream
}
func (c *Mock) StreamLogs() (chan string, error) {
logCh := make(chan string)
go func() {
for {
logCh <- mockLog
time.Sleep(250 * time.Millisecond)
}
}()
return logCh, nil
func (c *Mock) Logs() LogCollector {
return &MockLogs{make(chan bool)}
}
func (c *Mock) run() {

View File

@ -0,0 +1,29 @@
package collector
import (
"time"
)
const mockLog = "Cura ob pro qui tibi inveni dum qua fit donec amare illic mea, regem falli contexo pro peregrinorum heremo absconditi araneae meminerim deliciosas actionibus facere modico dura sonuerunt psalmi contra rerum, tempus mala anima volebant dura quae o modis."
type MockLogs struct {
done chan bool
}
func (l *MockLogs) Stream() chan string {
logCh := make(chan string)
go func() {
for {
select {
case <-l.done:
break
default:
logCh <- mockLog
time.Sleep(250 * time.Millisecond)
}
}
}()
return logCh
}
func (l *MockLogs) Stop() { l.done <- true }

View File

@ -3,7 +3,6 @@
package collector
import (
"fmt"
"time"
"github.com/bcicen/ctop/models"
@ -52,8 +51,8 @@ func (c *Runc) Stream() chan models.Metrics {
return c.stream
}
func (c *Runc) StreamLogs() (chan string, error) {
return nil, fmt.Errorf("log streaming unavailable for runc collector")
func (c *Runc) Logs() LogCollector {
return nil
}
func (c *Runc) run() {

View File

@ -67,6 +67,11 @@ func (c *Container) SetState(s string) {
}
}
// Return container log collector
func (c *Container) Logs() collector.LogCollector {
return c.collector.Logs()
}
// Read metric stream, updating widgets
func (c *Container) Read(stream chan models.Metrics) {
go func() {