to inspire confidence in somebody.

0%

Kubernetes CronJob Controller源码分析

最近的一个项目需要用到Kubernetes的CronJob,主要用来定时执行一个备份任务,刚开始使用的时候发现没有按照预期的情况运行,所以决定看看CronJob Controller的代码,看看他是怎么实现对应的功能的,正好发现网上也没有其他人写过关于CronJob Controller代码的解析(可能是太简单了不用写吧)。所以也就正好记录一下。

CronJob Controller的代码在kubernetes/pkg/controller/cronjob路径下,主要的逻辑实现在这个目录的cronjob_controller.go,这里分析的是v1.10.2版本的代码,可以直接链接到Github查看。

我们直接跳过创建Controller的过程,直接看运行的部分:

1
func (jm *CronJobController) Run(stopCh <-chan struct{}) {
2
	defer utilruntime.HandleCrash()
3
	glog.Infof("Starting CronJob Manager")
4
	// Check things every 10 second.
5
	go wait.Until(jm.syncAll, 10*time.Second, stopCh)
6
	<-stopCh
7
	glog.Infof("Shutting down CronJob Manager")
8
}

这个很简单,每隔10s去执行一次syncAll这个方法,然后我们再看看这个方法:

1
func (jm *CronJobController) syncAll() {
2
	// List children (Jobs) before parents (CronJob).
3
	// This guarantees that if we see any Job that got orphaned by the GC orphan finalizer,
4
	// we must also see that the parent CronJob has non-nil DeletionTimestamp (see #42639).
5
	// Note that this only works because we are NOT using any caches here.
6
	// 先把所有的Job给列出来,主要是为了找到所有的CronJob和CronJob创建的Job的对应关系
7
	jl, err := jm.kubeClient.BatchV1().Jobs(metav1.NamespaceAll).List(metav1.ListOptions{})
8
	if err != nil {
9
		utilruntime.HandleError(fmt.Errorf("can't list Jobs: %v", err))
10
		return
11
	}
12
	js := jl.Items
13
	glog.V(4).Infof("Found %d jobs", len(js))
14
	// 这里列出来所有的CronJob
15
	sjl, err := jm.kubeClient.BatchV1beta1().CronJobs(metav1.NamespaceAll).List(metav1.ListOptions{})
16
	if err != nil {
17
		utilruntime.HandleError(fmt.Errorf("can't list CronJobs: %v", err))
18
		return
19
	}
20
	sjs := sjl.Items
21
	glog.V(4).Infof("Found %d cronjobs", len(sjs))
22
23
	// 获取一个map[uid][]Job,主要就是根据CronJob的UID将所有属于它的Job给聚合起来
24
	jobsBySj := groupJobsByParent(js)
25
	glog.V(4).Infof("Found %d groups", len(jobsBySj))
26
27
	for _, sj := range sjs {
28
		// 针对每个CronJob调用syncOne
29
		syncOne(&sj, jobsBySj[sj.UID], time.Now(), jm.jobControl, jm.sjControl, jm.podControl, jm.recorder)
30
		// 根据相应的配置,主要是SuccessfulJobsHistoryLimit和FailedJobsHistoryLimit删除掉多余的Job
31
		cleanupFinishedJobs(&sj, jobsBySj[sj.UID], jm.jobControl, jm.sjControl, jm.podControl, jm.recorder)
32
	}
33
}

然后就走到了关键方法syncOne了:

1
func syncOne(sj *batchv1beta1.CronJob, js []batchv1.Job, now time.Time, jc jobControlInterface, sjc sjControlInterface, pc podControlInterface, recorder record.EventRecorder) {
2
	nameForLog := fmt.Sprintf("%s/%s", sj.Namespace, sj.Name)
3
4
	// 首先扫一遍所有的子Job,看是否有不在Active列表中的孤儿,以及已经执行完成但是还在Active列表中的Job,记录一下Event,删掉不对应的状态。
5
	childrenJobs := make(map[types.UID]bool)
6
	for _, j := range js {
7
		childrenJobs[j.ObjectMeta.UID] = true
8
		found := inActiveList(*sj, j.ObjectMeta.UID)
9
		if !found && !IsJobFinished(&j) {
10
			recorder.Eventf(sj, v1.EventTypeWarning, "UnexpectedJob", "Saw a job that the controller did not create or forgot: %v", j.Name)
11
			// We found an unfinished job that has us as the parent, but it is not in our Active list.
12
			// This could happen if we crashed right after creating the Job and before updating the status,
13
			// or if our jobs list is newer than our sj status after a relist, or if someone intentionally created
14
			// a job that they wanted us to adopt.
15
16
			// TODO: maybe handle the adoption case?  Concurrency/suspend rules will not apply in that case, obviously, since we can't
17
			// stop users from creating jobs if they have permission.  It is assumed that if a
18
			// user has permission to create a job within a namespace, then they have permission to make any scheduledJob
19
			// in the same namespace "adopt" that job.  ReplicaSets and their Pods work the same way.
20
			// TBS: how to update sj.Status.LastScheduleTime if the adopted job is newer than any we knew about?
21
		} else if found && IsJobFinished(&j) {
22
			deleteFromActiveList(sj, j.ObjectMeta.UID)
23
			// TODO: event to call out failure vs success.
24
			recorder.Eventf(sj, v1.EventTypeNormal, "SawCompletedJob", "Saw completed job: %v", j.Name)
25
		}
26
	}
27
28
	// Remove any job reference from the active list if the corresponding job does not exist any more.
29
	// Otherwise, the cronjob may be stuck in active mode forever even though there is no matching
30
	// job running.
31
	// 然后再看一下Active里是否有一些不存在的Job,如果有,也删除掉
32
	for _, j := range sj.Status.Active {
33
		if found := childrenJobs[j.UID]; !found {
34
			recorder.Eventf(sj, v1.EventTypeNormal, "MissingJob", "Active job went missing: %v", j.Name)
35
			deleteFromActiveList(sj, j.UID)
36
		}
37
	}
38
	// 更新一下CronJob的状态
39
	updatedSJ, err := sjc.UpdateStatus(sj)
40
	if err != nil {
41
		glog.Errorf("Unable to update status for %s (rv = %s): %v", nameForLog, sj.ResourceVersion, err)
42
		return
43
	}
44
	*sj = *updatedSJ
45
46
	// 判断CronJob是否删除,如果删除了就不管
47
	if sj.DeletionTimestamp != nil {
48
		// The CronJob is being deleted.
49
		// Don't do anything other than updating status.
50
		return
51
	}
52
	// 判断是否是停止调度状态,如果是则不管
53
	if sj.Spec.Suspend != nil && *sj.Spec.Suspend {
54
		glog.V(4).Infof("Not starting job for %s because it is suspended", nameForLog)
55
		return
56
	}
57
	// 这里先计算到目前为止需要执行的Job时间列表
58
	times, err := getRecentUnmetScheduleTimes(*sj, now)
59
	if err != nil {
60
		recorder.Eventf(sj, v1.EventTypeWarning, "FailedNeedsStart", "Cannot determine if job needs to be started: %v", err)
61
		glog.Errorf("Cannot determine if %s needs to be started: %v", nameForLog, err)
62
		return
63
	}
64
	// TODO: handle multiple unmet start times, from oldest to newest, updating status as needed.
65
	if len(times) == 0 {
66
		glog.V(4).Infof("No unmet start times for %s", nameForLog)
67
		return
68
	}
69
	if len(times) > 1 {
70
		glog.V(4).Infof("Multiple unmet start times for %s so only starting last one", nameForLog)
71
	}
72
	// 拿出最后需要执行的时间
73
	scheduledTime := times[len(times)-1]
74
	tooLate := false
75
	// 如果存在StartingDeadlineSeconds配置,判断当前时间是否超过了执行时限
76
	if sj.Spec.StartingDeadlineSeconds != nil {
77
		tooLate = scheduledTime.Add(time.Second * time.Duration(*sj.Spec.StartingDeadlineSeconds)).Before(now)
78
	}
79
	// 如果超过了执行时限就不做了
80
	if tooLate {
81
		glog.V(4).Infof("Missed starting window for %s", nameForLog)
82
		// TODO: generate an event for a miss.  Use a warning level event because it indicates a
83
		// problem with the controller (restart or long queue), and is not expected by user either.
84
		// Since we don't set LastScheduleTime when not scheduling, we are going to keep noticing
85
		// the miss every cycle.  In order to avoid sending multiple events, and to avoid processing
86
		// the sj again and again, we could set a Status.LastMissedTime when we notice a miss.
87
		// Then, when we call getRecentUnmetScheduleTimes, we can take max(creationTimestamp,
88
		// Status.LastScheduleTime, Status.LastMissedTime), and then so we won't generate
89
		// and event the next time we process it, and also so the user looking at the status
90
		// can see easily that there was a missed execution.
91
		return
92
	}
93
	// 如果ConcurrencyPolicy配置为Forbid即不允许并行执行但是现在有正在执行的Job也不会添加新的
94
	if sj.Spec.ConcurrencyPolicy == batchv1beta1.ForbidConcurrent && len(sj.Status.Active) > 0 {
95
		// Regardless which source of information we use for the set of active jobs,
96
		// there is some risk that we won't see an active job when there is one.
97
		// (because we haven't seen the status update to the SJ or the created pod).
98
		// So it is theoretically possible to have concurrency with Forbid.
99
		// As long the as the invokations are "far enough apart in time", this usually won't happen.
100
		//
101
		// TODO: for Forbid, we could use the same name for every execution, as a lock.
102
		// With replace, we could use a name that is deterministic per execution time.
103
		// But that would mean that you could not inspect prior successes or failures of Forbid jobs.
104
		glog.V(4).Infof("Not starting job for %s because of prior execution still running and concurrency policy is Forbid", nameForLog)
105
		return
106
	}
107
	如果ConcurrencyPolicy配置为Replace并且有正在运行的Job则把对应的Job给删除掉
108
	if sj.Spec.ConcurrencyPolicy == batchv1beta1.ReplaceConcurrent {
109
		for _, j := range sj.Status.Active {
110
			// TODO: this should be replaced with server side job deletion
111
			// currently this mimics JobReaper from pkg/kubectl/stop.go
112
			glog.V(4).Infof("Deleting job %s of %s that was still running at next scheduled start time", j.Name, nameForLog)
113
114
			job, err := jc.GetJob(j.Namespace, j.Name)
115
			if err != nil {
116
				recorder.Eventf(sj, v1.EventTypeWarning, "FailedGet", "Get job: %v", err)
117
				return
118
			}
119
			if !deleteJob(sj, job, jc, pc, recorder, "") {
120
				return
121
			}
122
		}
123
	}
124
	// 根据CronJob Spec中JobTemplate的配置获取Job对象,其中Job对象的名字会加上scheduledTime计算出的Hash,目前是unix timestamp
125
	jobReq, err := getJobFromTemplate(sj, scheduledTime)
126
	if err != nil {
127
		glog.Errorf("Unable to make Job from template in %s: %v", nameForLog, err)
128
		return
129
	}
130
	// 调用接口创建一个新的Job
131
	jobResp, err := jc.CreateJob(sj.Namespace, jobReq)
132
	if err != nil {
133
		recorder.Eventf(sj, v1.EventTypeWarning, "FailedCreate", "Error creating job: %v", err)
134
		return
135
	}
136
	glog.V(4).Infof("Created Job %s for %s", jobResp.Name, nameForLog)
137
	recorder.Eventf(sj, v1.EventTypeNormal, "SuccessfulCreate", "Created job %v", jobResp.Name)
138
139
	// ------------------------------------------------------------------ //
140
141
	// If this process restarts at this point (after posting a job, but
142
	// before updating the status), then we might try to start the job on
143
	// the next time.  Actually, if we relist the SJs and Jobs on the next
144
	// iteration of syncAll, we might not see our own status update, and
145
	// then post one again.  So, we need to use the job name as a lock to
146
	// prevent us from making the job twice (name the job with hash of its
147
	// scheduled time).
148
149
	// Add the just-started job to the status list.
150
	// 将刚创建的Job加到CronJob的Active列表中,设置LastScheduleTime,更新CronJob
151
	ref, err := getRef(jobResp)
152
	if err != nil {
153
		glog.V(2).Infof("Unable to make object reference for job for %s", nameForLog)
154
	} else {
155
		sj.Status.Active = append(sj.Status.Active, *ref)
156
	}
157
	sj.Status.LastScheduleTime = &metav1.Time{Time: scheduledTime}
158
	if _, err := sjc.UpdateStatus(sj); err != nil {
159
		glog.Infof("Unable to update status for %s (rv = %s): %v", nameForLog, sj.ResourceVersion, err)
160
	}
161
162
	return
163
}

整体逻辑还是比较简单和清晰的,其中有个获取需要执行的时间列表的方法getRecentUnmetScheduleTimes,可以再看一下:

1
func getRecentUnmetScheduleTimes(sj batchv1beta1.CronJob, now time.Time) ([]time.Time, error) {
2
	starts := []time.Time{}
3
	// 使用robfig/cron对Schedule进行解析
4
	sched, err := cron.ParseStandard(sj.Spec.Schedule)
5
	if err != nil {
6
		return starts, fmt.Errorf("Unparseable schedule: %s : %s", sj.Spec.Schedule, err)
7
	}
8
9
	// 判断初始时间,如果CronJob之前被执行过,则以上次执行实现为准,如果没有执行过,则以CronJob创建时间为准
10
	var earliestTime time.Time
11
	if sj.Status.LastScheduleTime != nil {
12
		earliestTime = sj.Status.LastScheduleTime.Time
13
	} else {
14
		// If none found, then this is either a recently created scheduledJob,
15
		// or the active/completed info was somehow lost (contract for status
16
		// in kubernetes says it may need to be recreated), or that we have
17
		// started a job, but have not noticed it yet (distributed systems can
18
		// have arbitrary delays).  In any case, use the creation time of the
19
		// CronJob as last known start time.
20
		earliestTime = sj.ObjectMeta.CreationTimestamp.Time
21
	}
22
	// 如果设置了StartingDeadlineSeconds,并且当前时间减去该值比初始时间还晚,那就以新的时间为准
23
	if sj.Spec.StartingDeadlineSeconds != nil {
24
		// Controller is not going to schedule anything below this point
25
		schedulingDeadline := now.Add(-time.Second * time.Duration(*sj.Spec.StartingDeadlineSeconds))
26
27
		if schedulingDeadline.After(earliestTime) {
28
			earliestTime = schedulingDeadline
29
		}
30
	}
31
	// 如果初始时间比现在还晚,直接跳过了
32
	if earliestTime.After(now) {
33
		return []time.Time{}, nil
34
	}
35
	// 计算从初始时间到现在所有需要执行的任务的时间
36
	// 主要有可能一个Cron会错过很多次执行,所以需要计算所有的,但是如果超过太多,也就没有意义了。只关注前100个
37
	for t := sched.Next(earliestTime); !t.After(now); t = sched.Next(t) {
38
		starts = append(starts, t)
39
		// An object might miss several starts. For example, if
40
		// controller gets wedged on friday at 5:01pm when everyone has
41
		// gone home, and someone comes in on tuesday AM and discovers
42
		// the problem and restarts the controller, then all the hourly
43
		// jobs, more than 80 of them for one hourly scheduledJob, should
44
		// all start running with no further intervention (if the scheduledJob
45
		// allows concurrency and late starts).
46
		//
47
		// However, if there is a bug somewhere, or incorrect clock
48
		// on controller's server or apiservers (for setting creationTimestamp)
49
		// then there could be so many missed start times (it could be off
50
		// by decades or more), that it would eat up all the CPU and memory
51
		// of this controller. In that case, we want to not try to list
52
		// all the missed start times.
53
		//
54
		// I've somewhat arbitrarily picked 100, as more than 80,
55
		// but less than "lots".
56
		if len(starts) > 100 {
57
			// We can't get the most recent times so just return an empty slice
58
			return []time.Time{}, fmt.Errorf("Too many missed start time (> 100). Set or decrease .spec.startingDeadlineSeconds or check clock skew.")
59
		}
60
	}
61
	return starts, nil
62
}

基本上主要的业务逻辑都在这里了,整体上看还是十分“暴力”和简单的,没有用到Informer等等类似的东西,就是不停轮询、计算需要执行的任务、添加任务。

需要吐槽一下的是,由于之前版本里K8s的CronJob名字叫ScheduledJob,后来改成了CronJob,导致代码里很多变量的命名都还是使用的ScheduledJob的缩写sj什么的,改名也不改变量名的,一开始看的时候就觉得这变量名怎么这么奇怪。

代码看完了,最后回到开头的问题,问题出在哪呢?最后发现其实是时区错了,controller-mananger运行的容器的时区是UTC时间,然而Unix Cron语法里没有时区概念,作为中国人当然是以北京时间为标准设置Cron,结果呢,差了8个小时~~囧。