C0reFast记事本

to inspire confidence in somebody.

0%

Kubernetes CronJob Controller源码分析

最近的一个项目需要用到Kubernetes的CronJob,主要用来定时执行一个备份任务,刚开始使用的时候发现没有按照预期的情况运行,所以决定看看CronJob Controller的代码,看看他是怎么实现对应的功能的,正好发现网上也没有其他人写过关于CronJob Controller代码的解析(可能是太简单了不用写吧)。所以也就正好记录一下。

CronJob Controller的代码在kubernetes/pkg/controller/cronjob路径下,主要的逻辑实现在这个目录的cronjob_controller.go,这里分析的是v1.10.2版本的代码,可以直接链接到Github查看。

我们直接跳过创建Controller的过程,直接看运行的部分:

1
2
3
4
5
6
7
8
func (jm *CronJobController) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
glog.Infof("Starting CronJob Manager")
// Check things every 10 second.
go wait.Until(jm.syncAll, 10*time.Second, stopCh)
<-stopCh
glog.Infof("Shutting down CronJob Manager")
}

这个很简单,每隔10s去执行一次syncAll这个方法,然后我们再看看这个方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func (jm *CronJobController) syncAll() {
// List children (Jobs) before parents (CronJob).
// This guarantees that if we see any Job that got orphaned by the GC orphan finalizer,
// we must also see that the parent CronJob has non-nil DeletionTimestamp (see #42639).
// Note that this only works because we are NOT using any caches here.
// 先把所有的Job给列出来,主要是为了找到所有的CronJob和CronJob创建的Job的对应关系
jl, err := jm.kubeClient.BatchV1().Jobs(metav1.NamespaceAll).List(metav1.ListOptions{})
if err != nil {
utilruntime.HandleError(fmt.Errorf("can't list Jobs: %v", err))
return
}
js := jl.Items
glog.V(4).Infof("Found %d jobs", len(js))
// 这里列出来所有的CronJob
sjl, err := jm.kubeClient.BatchV1beta1().CronJobs(metav1.NamespaceAll).List(metav1.ListOptions{})
if err != nil {
utilruntime.HandleError(fmt.Errorf("can't list CronJobs: %v", err))
return
}
sjs := sjl.Items
glog.V(4).Infof("Found %d cronjobs", len(sjs))

// 获取一个map[uid][]Job,主要就是根据CronJob的UID将所有属于它的Job给聚合起来
jobsBySj := groupJobsByParent(js)
glog.V(4).Infof("Found %d groups", len(jobsBySj))

for _, sj := range sjs {
// 针对每个CronJob调用syncOne
syncOne(&sj, jobsBySj[sj.UID], time.Now(), jm.jobControl, jm.sjControl, jm.podControl, jm.recorder)
// 根据相应的配置,主要是SuccessfulJobsHistoryLimit和FailedJobsHistoryLimit删除掉多余的Job
cleanupFinishedJobs(&sj, jobsBySj[sj.UID], jm.jobControl, jm.sjControl, jm.podControl, jm.recorder)
}
}

然后就走到了关键方法syncOne了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
func syncOne(sj *batchv1beta1.CronJob, js []batchv1.Job, now time.Time, jc jobControlInterface, sjc sjControlInterface, pc podControlInterface, recorder record.EventRecorder) {
nameForLog := fmt.Sprintf("%s/%s", sj.Namespace, sj.Name)

// 首先扫一遍所有的子Job,看是否有不在Active列表中的孤儿,以及已经执行完成但是还在Active列表中的Job,记录一下Event,删掉不对应的状态。
childrenJobs := make(map[types.UID]bool)
for _, j := range js {
childrenJobs[j.ObjectMeta.UID] = true
found := inActiveList(*sj, j.ObjectMeta.UID)
if !found && !IsJobFinished(&j) {
recorder.Eventf(sj, v1.EventTypeWarning, "UnexpectedJob", "Saw a job that the controller did not create or forgot: %v", j.Name)
// We found an unfinished job that has us as the parent, but it is not in our Active list.
// This could happen if we crashed right after creating the Job and before updating the status,
// or if our jobs list is newer than our sj status after a relist, or if someone intentionally created
// a job that they wanted us to adopt.

// TODO: maybe handle the adoption case? Concurrency/suspend rules will not apply in that case, obviously, since we can't
// stop users from creating jobs if they have permission. It is assumed that if a
// user has permission to create a job within a namespace, then they have permission to make any scheduledJob
// in the same namespace "adopt" that job. ReplicaSets and their Pods work the same way.
// TBS: how to update sj.Status.LastScheduleTime if the adopted job is newer than any we knew about?
} else if found && IsJobFinished(&j) {
deleteFromActiveList(sj, j.ObjectMeta.UID)
// TODO: event to call out failure vs success.
recorder.Eventf(sj, v1.EventTypeNormal, "SawCompletedJob", "Saw completed job: %v", j.Name)
}
}

// Remove any job reference from the active list if the corresponding job does not exist any more.
// Otherwise, the cronjob may be stuck in active mode forever even though there is no matching
// job running.
// 然后再看一下Active里是否有一些不存在的Job,如果有,也删除掉
for _, j := range sj.Status.Active {
if found := childrenJobs[j.UID]; !found {
recorder.Eventf(sj, v1.EventTypeNormal, "MissingJob", "Active job went missing: %v", j.Name)
deleteFromActiveList(sj, j.UID)
}
}
// 更新一下CronJob的状态
updatedSJ, err := sjc.UpdateStatus(sj)
if err != nil {
glog.Errorf("Unable to update status for %s (rv = %s): %v", nameForLog, sj.ResourceVersion, err)
return
}
*sj = *updatedSJ

// 判断CronJob是否删除,如果删除了就不管
if sj.DeletionTimestamp != nil {
// The CronJob is being deleted.
// Don't do anything other than updating status.
return
}
// 判断是否是停止调度状态,如果是则不管
if sj.Spec.Suspend != nil && *sj.Spec.Suspend {
glog.V(4).Infof("Not starting job for %s because it is suspended", nameForLog)
return
}
// 这里先计算到目前为止需要执行的Job时间列表
times, err := getRecentUnmetScheduleTimes(*sj, now)
if err != nil {
recorder.Eventf(sj, v1.EventTypeWarning, "FailedNeedsStart", "Cannot determine if job needs to be started: %v", err)
glog.Errorf("Cannot determine if %s needs to be started: %v", nameForLog, err)
return
}
// TODO: handle multiple unmet start times, from oldest to newest, updating status as needed.
if len(times) == 0 {
glog.V(4).Infof("No unmet start times for %s", nameForLog)
return
}
if len(times) > 1 {
glog.V(4).Infof("Multiple unmet start times for %s so only starting last one", nameForLog)
}
// 拿出最后需要执行的时间
scheduledTime := times[len(times)-1]
tooLate := false
// 如果存在StartingDeadlineSeconds配置,判断当前时间是否超过了执行时限
if sj.Spec.StartingDeadlineSeconds != nil {
tooLate = scheduledTime.Add(time.Second * time.Duration(*sj.Spec.StartingDeadlineSeconds)).Before(now)
}
// 如果超过了执行时限就不做了
if tooLate {
glog.V(4).Infof("Missed starting window for %s", nameForLog)
// TODO: generate an event for a miss. Use a warning level event because it indicates a
// problem with the controller (restart or long queue), and is not expected by user either.
// Since we don't set LastScheduleTime when not scheduling, we are going to keep noticing
// the miss every cycle. In order to avoid sending multiple events, and to avoid processing
// the sj again and again, we could set a Status.LastMissedTime when we notice a miss.
// Then, when we call getRecentUnmetScheduleTimes, we can take max(creationTimestamp,
// Status.LastScheduleTime, Status.LastMissedTime), and then so we won't generate
// and event the next time we process it, and also so the user looking at the status
// can see easily that there was a missed execution.
return
}
// 如果ConcurrencyPolicy配置为Forbid即不允许并行执行但是现在有正在执行的Job也不会添加新的
if sj.Spec.ConcurrencyPolicy == batchv1beta1.ForbidConcurrent && len(sj.Status.Active) > 0 {
// Regardless which source of information we use for the set of active jobs,
// there is some risk that we won't see an active job when there is one.
// (because we haven't seen the status update to the SJ or the created pod).
// So it is theoretically possible to have concurrency with Forbid.
// As long the as the invokations are "far enough apart in time", this usually won't happen.
//
// TODO: for Forbid, we could use the same name for every execution, as a lock.
// With replace, we could use a name that is deterministic per execution time.
// But that would mean that you could not inspect prior successes or failures of Forbid jobs.
glog.V(4).Infof("Not starting job for %s because of prior execution still running and concurrency policy is Forbid", nameForLog)
return
}
如果ConcurrencyPolicy配置为Replace并且有正在运行的Job则把对应的Job给删除掉
if sj.Spec.ConcurrencyPolicy == batchv1beta1.ReplaceConcurrent {
for _, j := range sj.Status.Active {
// TODO: this should be replaced with server side job deletion
// currently this mimics JobReaper from pkg/kubectl/stop.go
glog.V(4).Infof("Deleting job %s of %s that was still running at next scheduled start time", j.Name, nameForLog)

job, err := jc.GetJob(j.Namespace, j.Name)
if err != nil {
recorder.Eventf(sj, v1.EventTypeWarning, "FailedGet", "Get job: %v", err)
return
}
if !deleteJob(sj, job, jc, pc, recorder, "") {
return
}
}
}
// 根据CronJob Spec中JobTemplate的配置获取Job对象,其中Job对象的名字会加上scheduledTime计算出的Hash,目前是unix timestamp
jobReq, err := getJobFromTemplate(sj, scheduledTime)
if err != nil {
glog.Errorf("Unable to make Job from template in %s: %v", nameForLog, err)
return
}
// 调用接口创建一个新的Job
jobResp, err := jc.CreateJob(sj.Namespace, jobReq)
if err != nil {
recorder.Eventf(sj, v1.EventTypeWarning, "FailedCreate", "Error creating job: %v", err)
return
}
glog.V(4).Infof("Created Job %s for %s", jobResp.Name, nameForLog)
recorder.Eventf(sj, v1.EventTypeNormal, "SuccessfulCreate", "Created job %v", jobResp.Name)

// ------------------------------------------------------------------ //

// If this process restarts at this point (after posting a job, but
// before updating the status), then we might try to start the job on
// the next time. Actually, if we relist the SJs and Jobs on the next
// iteration of syncAll, we might not see our own status update, and
// then post one again. So, we need to use the job name as a lock to
// prevent us from making the job twice (name the job with hash of its
// scheduled time).

// Add the just-started job to the status list.
// 将刚创建的Job加到CronJob的Active列表中,设置LastScheduleTime,更新CronJob
ref, err := getRef(jobResp)
if err != nil {
glog.V(2).Infof("Unable to make object reference for job for %s", nameForLog)
} else {
sj.Status.Active = append(sj.Status.Active, *ref)
}
sj.Status.LastScheduleTime = &metav1.Time{Time: scheduledTime}
if _, err := sjc.UpdateStatus(sj); err != nil {
glog.Infof("Unable to update status for %s (rv = %s): %v", nameForLog, sj.ResourceVersion, err)
}

return
}

整体逻辑还是比较简单和清晰的,其中有个获取需要执行的时间列表的方法getRecentUnmetScheduleTimes,可以再看一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
func getRecentUnmetScheduleTimes(sj batchv1beta1.CronJob, now time.Time) ([]time.Time, error) {
starts := []time.Time{}
// 使用robfig/cron对Schedule进行解析
sched, err := cron.ParseStandard(sj.Spec.Schedule)
if err != nil {
return starts, fmt.Errorf("Unparseable schedule: %s : %s", sj.Spec.Schedule, err)
}

// 判断初始时间,如果CronJob之前被执行过,则以上次执行实现为准,如果没有执行过,则以CronJob创建时间为准
var earliestTime time.Time
if sj.Status.LastScheduleTime != nil {
earliestTime = sj.Status.LastScheduleTime.Time
} else {
// If none found, then this is either a recently created scheduledJob,
// or the active/completed info was somehow lost (contract for status
// in kubernetes says it may need to be recreated), or that we have
// started a job, but have not noticed it yet (distributed systems can
// have arbitrary delays). In any case, use the creation time of the
// CronJob as last known start time.
earliestTime = sj.ObjectMeta.CreationTimestamp.Time
}
// 如果设置了StartingDeadlineSeconds,并且当前时间减去该值比初始时间还晚,那就以新的时间为准
if sj.Spec.StartingDeadlineSeconds != nil {
// Controller is not going to schedule anything below this point
schedulingDeadline := now.Add(-time.Second * time.Duration(*sj.Spec.StartingDeadlineSeconds))

if schedulingDeadline.After(earliestTime) {
earliestTime = schedulingDeadline
}
}
// 如果初始时间比现在还晚,直接跳过了
if earliestTime.After(now) {
return []time.Time{}, nil
}
// 计算从初始时间到现在所有需要执行的任务的时间
// 主要有可能一个Cron会错过很多次执行,所以需要计算所有的,但是如果超过太多,也就没有意义了。只关注前100个
for t := sched.Next(earliestTime); !t.After(now); t = sched.Next(t) {
starts = append(starts, t)
// An object might miss several starts. For example, if
// controller gets wedged on friday at 5:01pm when everyone has
// gone home, and someone comes in on tuesday AM and discovers
// the problem and restarts the controller, then all the hourly
// jobs, more than 80 of them for one hourly scheduledJob, should
// all start running with no further intervention (if the scheduledJob
// allows concurrency and late starts).
//
// However, if there is a bug somewhere, or incorrect clock
// on controller's server or apiservers (for setting creationTimestamp)
// then there could be so many missed start times (it could be off
// by decades or more), that it would eat up all the CPU and memory
// of this controller. In that case, we want to not try to list
// all the missed start times.
//
// I've somewhat arbitrarily picked 100, as more than 80,
// but less than "lots".
if len(starts) > 100 {
// We can't get the most recent times so just return an empty slice
return []time.Time{}, fmt.Errorf("Too many missed start time (> 100). Set or decrease .spec.startingDeadlineSeconds or check clock skew.")
}
}
return starts, nil
}

基本上主要的业务逻辑都在这里了,整体上看还是十分“暴力”和简单的,没有用到Informer等等类似的东西,就是不停轮询、计算需要执行的任务、添加任务。

需要吐槽一下的是,由于之前版本里K8s的CronJob名字叫ScheduledJob,后来改成了CronJob,导致代码里很多变量的命名都还是使用的ScheduledJob的缩写sj什么的,改名也不改变量名的,一开始看的时候就觉得这变量名怎么这么奇怪。

代码看完了,最后回到开头的问题,问题出在哪呢?最后发现其实是时区错了,controller-mananger运行的容器的时区是UTC时间,然而Unix Cron语法里没有时区概念,作为中国人当然是以北京时间为标准设置Cron,结果呢,差了8个小时~~囧。

Chen Fu wechat
新注册了公众号,同步更新,长按关注,可以第一时间收到推送