@@ -11,9 +11,13 @@ import (
1111 "time"
1212
1313 "github.com/mcuadros/go-defaults"
14+ "github.com/mitchellh/mapstructure"
1415 batchv1 "k8s.io/api/batch/v1"
1516 corev1 "k8s.io/api/core/v1"
1617 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
18+ "k8s.io/apimachinery/pkg/fields"
19+ "k8s.io/apimachinery/pkg/labels"
20+ "k8s.io/apimachinery/pkg/selection"
1721 "k8s.io/client-go/kubernetes"
1822 typedbatchv1 "k8s.io/client-go/kubernetes/typed/batch/v1"
1923 "k8s.io/client-go/rest"
@@ -25,6 +29,7 @@ const (
2529 bufferSize = 4096
2630 sleepTime = 500
2731 defaultTTLSecondsAfterFinished = 60
32+ trueString = "true"
2833)
2934
3035var (
@@ -48,6 +53,72 @@ type Pod struct {
4853 Containers []string `json:"containers"`
4954}
5055
56+ type LogOptions struct {
57+ App string `mapstructure:"app"`
58+ Pod string `mapstructure:"pod"`
59+ Container string `mapstructure:"container"`
60+ Follow string `mapstructure:"follow"`
61+ Previous string `mapstructure:"previous"`
62+ SinceSeconds string `mapstructure:"since_seconds"`
63+ Timestamps string `mapstructure:"timestamps"`
64+ TailLines string `mapstructure:"tail_lines"`
65+ }
66+
67+ func (l LogOptions ) getPodListOptions () (metav1.ListOptions , error ) {
68+ labelSelector := labels .NewSelector ()
69+ fieldSelector := fields .Everything ()
70+ r , err := labels .NewRequirement ("app" , selection .Equals , []string {l .App })
71+ if err != nil {
72+ return metav1.ListOptions {}, err
73+ }
74+ labelSelector = labelSelector .Add (* r )
75+
76+ if l .Pod != "" {
77+ fieldSelector = fields .AndSelectors (fieldSelector , fields .OneTermEqualSelector ("metadata.name" , l .Pod ))
78+ }
79+
80+ return metav1.ListOptions {
81+ LabelSelector : labelSelector .String (),
82+ FieldSelector : fieldSelector .String (),
83+ }, nil
84+ }
85+
86+ func (l LogOptions ) getPodLogOptions () (* corev1.PodLogOptions , error ) {
87+ podLogOpts := & corev1.PodLogOptions {
88+ Container : l .Container ,
89+ }
90+
91+ if l .Follow == trueString {
92+ podLogOpts .Follow = true
93+ }
94+
95+ if l .Previous == trueString {
96+ podLogOpts .Previous = true
97+ }
98+
99+ if l .Timestamps == trueString {
100+ podLogOpts .Timestamps = true
101+ }
102+
103+ if l .SinceSeconds != "" {
104+ ss , err := strconv .ParseInt (l .SinceSeconds , 10 , 64 )
105+ if err != nil {
106+ return nil , err
107+ }
108+ podLogOpts .SinceSeconds = & ss
109+ }
110+
111+ if l .TailLines != "" {
112+ tl , err := strconv .ParseInt (l .TailLines , 10 , 64 )
113+ if err != nil {
114+ return nil , err
115+ }
116+ podLogOpts .TailLines = & tl
117+ }
118+
119+ return podLogOpts , nil
120+ }
121+
51122func DefaultClientConfig () Config {
52123 var defaultProviderConfig Config
53124 defaults .SetDefaults (& defaultProviderConfig )
@@ -62,44 +133,14 @@ func NewClient(config Config) *Client {
62133}
63134
64135func (c Client ) StreamLogs (ctx context.Context , namespace string , filter map [string ]string ) (<- chan LogChunk , error ) {
65- var selectors []string
66- var podName , containerName , labelSelector , filedSelector string
67- var sinceSeconds , tailLines int64
68- var opts metav1.ListOptions
136+ var logOptions LogOptions
69137
70- for k , v := range filter {
71- switch k {
72- case "pod" :
73- podName = v
74- case "container" :
75- containerName = v
76- case "sinceSeconds" :
77- i , err := strconv .ParseInt (v , 10 , 64 )
78- if err != nil {
79- return nil , errors .ErrInvalid .WithMsgf ("invalid sinceSeconds filter value: %v" , err )
80- }
81- sinceSeconds = i
82- case "tailLine" :
83- i , err := strconv .ParseInt (v , 10 , 64 )
84- if err != nil {
85- return nil , errors .ErrInvalid .WithMsgf ("invalid tailLine filter value: %v" , err )
86- }
87- tailLines = i
88- default :
89- s := fmt .Sprintf ("%s=%s" , k , v )
90- selectors = append (selectors , s )
91- }
92- }
93-
94- if podName == "" {
95- labelSelector = strings .Join (selectors , "," )
96- opts = metav1.ListOptions {LabelSelector : labelSelector }
97- } else {
98- filedSelector = fmt .Sprintf ("metadata.name=%s" , podName )
99- opts = metav1.ListOptions {FieldSelector : filedSelector }
138+ err := mapstructure .Decode (filter , & logOptions )
139+ if err != nil {
140+ return nil , errors .ErrInvalid .WithMsgf (err .Error ())
100141 }
101142
102- return c .streamFromPods (ctx , namespace , containerName , opts , tailLines , sinceSeconds , filter )
143+ return c .streamFromPods (ctx , namespace , logOptions )
103144}
104145
105146func (c Client ) RunJob (ctx context.Context , namespace , name string , image string , cmd []string , retries int32 ) error {
@@ -170,13 +211,18 @@ func waitForJob(ctx context.Context, jobName string, jobs typedbatchv1.JobInterf
170211 }
171212}
172213
173- func (c Client ) streamFromPods (ctx context.Context , namespace , containerName string , opts metav1. ListOptions , tailLines , sinceSeconds int64 , filter map [ string ] string ) (<- chan LogChunk , error ) {
214+ func (c Client ) streamFromPods (ctx context.Context , namespace string , logOptions LogOptions ) (<- chan LogChunk , error ) {
174215 clientSet , err := kubernetes .NewForConfig (& c .restConfig )
175216 if err != nil {
176217 return nil , err
177218 }
178219
179- pods , err := clientSet .CoreV1 ().Pods (namespace ).List (ctx , opts )
220+ listOpts , err := logOptions .getPodListOptions ()
221+ if err != nil {
222+ return nil , err
223+ }
224+
225+ pods , err := clientSet .CoreV1 ().Pods (namespace ).List (ctx , listOpts )
180226 if err != nil {
181227 return nil , err
182228 }
@@ -191,16 +237,21 @@ func (c Client) streamFromPods(ctx context.Context, namespace, containerName str
191237 wg := & sync.WaitGroup {}
192238 for _ , pod := range pods .Items {
193239 for _ , container := range append (pod .Spec .InitContainers , pod .Spec .Containers ... ) {
194- if containerName != "" && container .Name != containerName {
240+ if logOptions . Container != "" && container .Name != logOptions . Container {
195241 continue
196242 }
243+ plo , err := logOptions .getPodLogOptions ()
244+ if err != nil {
245+ return nil , err
246+ }
247+ plo .Container = container .Name
197248 wg .Add (1 )
198- go func (podName string , c corev1.Container ) {
249+ go func (podName string , plo corev1.PodLogOptions ) {
199250 defer wg .Done ()
200- if err := streamContainerLogs (ctx , namespace , podName , logCh , streamingClientSet , c , tailLines , sinceSeconds , filter ); err != nil {
201- log .Printf ("[WARN] failed to stream from container '%s':%s" , c . Name , err )
251+ if err := streamContainerLogs (ctx , namespace , podName , logCh , streamingClientSet , plo ); err != nil {
252+ log .Printf ("[WARN] failed to stream from container '%s':%s" , plo . Container , err )
202253 }
203- }(pod .Name , container )
254+ }(pod .Name , * plo )
204255 }
205256 }
206257
@@ -249,27 +300,14 @@ func (c Client) GetPodDetails(ctx context.Context, namespace string, labelSelect
249300 return podDetails , nil
250301}
251302
252- func streamContainerLogs (ctx context.Context , ns , podName string , logCh chan <- LogChunk , clientSet * kubernetes.Clientset , container corev1.Container , tailLines , sinceSeconds int64 , filter map [string ]string ) error {
253- podLogOpts := corev1.PodLogOptions {}
254- podLogOpts .Follow = true
255- podLogOpts .Container = container .Name
256-
257- if sinceSeconds > 0 {
258- podLogOpts .SinceSeconds = & sinceSeconds
259- }
260-
261- if tailLines > 0 {
262- podLogOpts .TailLines = & tailLines
263- }
264-
303+ func streamContainerLogs (ctx context.Context , ns , podName string , logCh chan <- LogChunk , clientSet * kubernetes.Clientset ,
304+ podLogOpts corev1.PodLogOptions ,
305+ ) error {
265306 podLogs , err := clientSet .CoreV1 ().Pods (ns ).GetLogs (podName , & podLogOpts ).Stream (ctx )
266307 if err != nil {
267308 return err
268309 }
269310
270- filter ["pod" ] = podName
271- filter ["container" ] = container .Name
272-
273311 buf := make ([]byte , bufferSize )
274312 for {
275313 numBytes , err := podLogs .Read (buf )
@@ -285,7 +323,7 @@ func streamContainerLogs(ctx context.Context, ns, podName string, logCh chan<- L
285323
286324 logChunk := LogChunk {
287325 Data : []byte (string (buf [:numBytes ])),
288- Labels : filter ,
326+ Labels : map [ string ] string { "pod" : podName , "container" : podLogOpts . Container } ,
289327 }
290328
291329 select {
0 commit comments