diff --git a/connector/collector/docker.go b/connector/collector/docker.go index 9f13f76..960e224 100644 --- a/connector/collector/docker.go +++ b/connector/collector/docker.go @@ -1,9 +1,6 @@ package collector import ( - "bufio" - "io" - "github.com/bcicen/ctop/models" api "github.com/fsouza/go-dockerclient" ) @@ -68,29 +65,8 @@ func (c *Docker) Stream() chan models.Metrics { return c.stream } -func (c *Docker) StreamLogs() (chan string, error) { - r, w := io.Pipe() - logCh := make(chan string) - - opts := api.LogsOptions{ - Container: c.id, - OutputStream: w, - ErrorStream: w, - Stdout: true, - Stderr: true, - Tail: "10", - Follow: true, - Timestamps: true, - } - - go tailLogs(r, logCh) - go func() { - err := c.client.Logs(opts) - if err != nil { - log.Errorf("error reading container logs: %s", err) - } - }() - return logCh, nil +func (c *Docker) Logs() LogCollector { + return &DockerLogs{c.id, c.client, make(chan bool)} } // Stop collector @@ -139,10 +115,3 @@ func (c *Docker) ReadIO(stats *api.Stats) { } c.IOBytesRead, c.IOBytesWrite = read, write } - -func tailLogs(reader io.Reader, ch chan string) { - scanner := bufio.NewScanner(reader) - for scanner.Scan() { - ch <- scanner.Text() - } -} diff --git a/connector/collector/docker_logs.go b/connector/collector/docker_logs.go new file mode 100644 index 0000000..33d47a3 --- /dev/null +++ b/connector/collector/docker_logs.go @@ -0,0 +1,60 @@ +package collector + +import ( + "bufio" + "context" + "io" + + api "github.com/fsouza/go-dockerclient" +) + +type DockerLogs struct { + id string + client *api.Client + done chan bool +} + +func (l *DockerLogs) Stream() chan string { + r, w := io.Pipe() + logCh := make(chan string) + ctx, cancel := context.WithCancel(context.Background()) + + opts := api.LogsOptions{ + Context: ctx, + Container: l.id, + OutputStream: w, + ErrorStream: w, + Stdout: true, + Stderr: true, + Tail: "10", + Follow: true, + Timestamps: true, + } + + // read io pipe into channel + go func() { + scanner := bufio.NewScanner(r) + for scanner.Scan() { + logCh <- scanner.Text() + } + }() + + // connect to container log stream + go func() { + err := l.client.Logs(opts) + if err != nil { + log.Errorf("error reading container logs: %s", err) + } + }() + + go func() { + select { + case <-l.done: + cancel() + } + }() + + return logCh +} + +func (l *DockerLogs) Stop() { l.done <- true } diff --git a/connector/collector/main.go b/connector/collector/main.go index a2b8fcb..5be6218 100644 --- a/connector/collector/main.go +++ b/connector/collector/main.go @@ -9,9 +9,14 @@ import ( var log = logging.Init() +type LogCollector interface { + Stream() chan string + Stop() +} + type Collector interface { Stream() chan models.Metrics - StreamLogs() (chan string, error) + Logs() LogCollector Running() bool Start() Stop() diff --git a/connector/collector/mock.go b/connector/collector/mock.go index 7376a71..caacf1b 100644 --- a/connector/collector/mock.go +++ b/connector/collector/mock.go @@ -9,8 +9,6 @@ import ( "github.com/bcicen/ctop/models" ) -const mockLog = "Cura ob pro qui tibi inveni dum qua fit donec amare illic mea, regem falli contexo pro peregrinorum heremo absconditi araneae meminerim deliciosas actionibus facere modico dura sonuerunt psalmi contra rerum, tempus mala anima volebant dura quae o modis." - // Mock collector type Mock struct { models.Metrics @@ -47,15 +45,8 @@ func (c *Mock) Stream() chan models.Metrics { return c.stream } -func (c *Mock) StreamLogs() (chan string, error) { - logCh := make(chan string) - go func() { - for { - logCh <- mockLog - time.Sleep(250 * time.Millisecond) - } - }() - return logCh, nil +func (c *Mock) Logs() LogCollector { + return &MockLogs{make(chan bool)} } func (c *Mock) run() { diff --git a/connector/collector/mock_logs.go b/connector/collector/mock_logs.go new file mode 100644 index 0000000..f9a74b3 --- /dev/null +++ b/connector/collector/mock_logs.go @@ -0,0 +1,29 @@ +package collector + +import ( + "time" +) + +const mockLog = "Cura ob pro qui tibi inveni dum qua fit donec amare illic mea, regem falli contexo pro peregrinorum heremo absconditi araneae meminerim deliciosas actionibus facere modico dura sonuerunt psalmi contra rerum, tempus mala anima volebant dura quae o modis." + +type MockLogs struct { + done chan bool +} + +func (l *MockLogs) Stream() chan string { + logCh := make(chan string) + go func() { + for { + select { + case <-l.done: + break + default: + logCh <- mockLog + time.Sleep(250 * time.Millisecond) + } + } + }() + return logCh +} + +func (l *MockLogs) Stop() { l.done <- true } diff --git a/connector/collector/runc.go b/connector/collector/runc.go index 0bb5f67..21b6af4 100644 --- a/connector/collector/runc.go +++ b/connector/collector/runc.go @@ -3,7 +3,6 @@ package collector import ( - "fmt" "time" "github.com/bcicen/ctop/models" @@ -52,8 +51,8 @@ func (c *Runc) Stream() chan models.Metrics { return c.stream } -func (c *Runc) StreamLogs() (chan string, error) { - return nil, fmt.Errorf("log streaming unavailable for runc collector") +func (c *Runc) Logs() LogCollector { + return nil } func (c *Runc) run() { diff --git a/container/main.go b/container/main.go index 3442f7d..3b93ab7 100644 --- a/container/main.go +++ b/container/main.go @@ -67,6 +67,11 @@ func (c *Container) SetState(s string) { } } +// Return container log collector +func (c *Container) Logs() collector.LogCollector { + return c.collector.Logs() +} + // Read metric stream, updating widgets func (c *Container) Read(stream chan models.Metrics) { go func() {