001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.mapreduce.lib.jobcontrol; 020 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.Collection; 024import java.util.Iterator; 025import java.util.LinkedList; 026import java.util.List; 027import java.util.HashMap; 028import java.util.HashSet; 029 030import org.apache.commons.logging.Log; 031import org.apache.commons.logging.LogFactory; 032import org.apache.hadoop.classification.InterfaceAudience; 033import org.apache.hadoop.classification.InterfaceStability; 034import org.apache.hadoop.mapred.jobcontrol.Job; 035import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob.State; 036import org.apache.hadoop.util.StringUtils; 037 038/** 039 * This class encapsulates a set of MapReduce jobs and its dependency. 040 * 041 * It tracks the states of the jobs by placing them into different tables 042 * according to their states. 043 * 044 * This class provides APIs for the client app to add a job to the group 045 * and to get the jobs in the group in different states. When a job is 046 * added, an ID unique to the group is assigned to the job. 047 * 048 * This class has a thread that submits jobs when they become ready, 049 * monitors the states of the running jobs, and updates the states of jobs 050 * based on the state changes of their depending jobs states. The class 051 * provides APIs for suspending/resuming the thread, and 052 * for stopping the thread. 053 * 054 */ 055@InterfaceAudience.Public 056@InterfaceStability.Evolving 057public class JobControl implements Runnable { 058 private static final Log LOG = LogFactory.getLog(JobControl.class); 059 060 // The thread can be in one of the following state 061 public static enum ThreadState {RUNNING, SUSPENDED,STOPPED, STOPPING, READY}; 062 063 private ThreadState runnerState; // the thread state 064 065 private LinkedList<ControlledJob> jobsInProgress = new LinkedList<ControlledJob>(); 066 private LinkedList<ControlledJob> successfulJobs = new LinkedList<ControlledJob>(); 067 private LinkedList<ControlledJob> failedJobs = new LinkedList<ControlledJob>(); 068 069 private long nextJobID; 070 private String groupName; 071 072 /** 073 * Construct a job control for a group of jobs. 074 * @param groupName a name identifying this group 075 */ 076 public JobControl(String groupName) { 077 this.nextJobID = -1; 078 this.groupName = groupName; 079 this.runnerState = ThreadState.READY; 080 } 081 082 private static List<ControlledJob> toList( 083 LinkedList<ControlledJob> jobs) { 084 ArrayList<ControlledJob> retv = new ArrayList<ControlledJob>(); 085 for (ControlledJob job : jobs) { 086 retv.add(job); 087 } 088 return retv; 089 } 090 091 synchronized private List<ControlledJob> getJobsIn(State state) { 092 LinkedList<ControlledJob> l = new LinkedList<ControlledJob>(); 093 for(ControlledJob j: jobsInProgress) { 094 if(j.getJobState() == state) { 095 l.add(j); 096 } 097 } 098 return l; 099 } 100 101 /** 102 * @return the jobs in the waiting state 103 */ 104 public List<ControlledJob> getWaitingJobList() { 105 return getJobsIn(State.WAITING); 106 } 107 108 /** 109 * @return the jobs in the running state 110 */ 111 public List<ControlledJob> getRunningJobList() { 112 return getJobsIn(State.RUNNING); 113 } 114 115 /** 116 * @return the jobs in the ready state 117 */ 118 public List<ControlledJob> getReadyJobsList() { 119 return getJobsIn(State.READY); 120 } 121 122 /** 123 * @return the jobs in the success state 124 */ 125 synchronized public List<ControlledJob> getSuccessfulJobList() { 126 return toList(this.successfulJobs); 127 } 128 129 synchronized public List<ControlledJob> getFailedJobList() { 130 return toList(this.failedJobs); 131 } 132 133 private String getNextJobID() { 134 nextJobID += 1; 135 return this.groupName + this.nextJobID; 136 } 137 138 /** 139 * Add a new controlled job. 140 * @param aJob the new controlled job 141 */ 142 synchronized public String addJob(ControlledJob aJob) { 143 String id = this.getNextJobID(); 144 aJob.setJobID(id); 145 aJob.setJobState(State.WAITING); 146 jobsInProgress.add(aJob); 147 return id; 148 } 149 150 /** 151 * Add a new job. 152 * @param aJob the new job 153 */ 154 synchronized public String addJob(Job aJob) { 155 return addJob((ControlledJob) aJob); 156 } 157 158 /** 159 * Add a collection of jobs 160 * 161 * @param jobs 162 */ 163 public void addJobCollection(Collection<ControlledJob> jobs) { 164 for (ControlledJob job : jobs) { 165 addJob(job); 166 } 167 } 168 169 /** 170 * @return the thread state 171 */ 172 public ThreadState getThreadState() { 173 return this.runnerState; 174 } 175 176 /** 177 * set the thread state to STOPPING so that the 178 * thread will stop when it wakes up. 179 */ 180 public void stop() { 181 this.runnerState = ThreadState.STOPPING; 182 } 183 184 /** 185 * suspend the running thread 186 */ 187 public void suspend () { 188 if (this.runnerState == ThreadState.RUNNING) { 189 this.runnerState = ThreadState.SUSPENDED; 190 } 191 } 192 193 /** 194 * resume the suspended thread 195 */ 196 public void resume () { 197 if (this.runnerState == ThreadState.SUSPENDED) { 198 this.runnerState = ThreadState.RUNNING; 199 } 200 } 201 202 synchronized public boolean allFinished() { 203 return jobsInProgress.isEmpty(); 204 } 205 206 /** 207 * The main loop for the thread. 208 * The loop does the following: 209 * Check the states of the running jobs 210 * Update the states of waiting jobs 211 * Submit the jobs in ready state 212 */ 213 public void run() { 214 if (isCircular(jobsInProgress)) { 215 throw new IllegalArgumentException("job control has circular dependency"); 216 } 217 try { 218 this.runnerState = ThreadState.RUNNING; 219 while (true) { 220 while (this.runnerState == ThreadState.SUSPENDED) { 221 try { 222 Thread.sleep(5000); 223 } 224 catch (Exception e) { 225 //TODO the thread was interrupted, do something!!! 226 } 227 } 228 229 synchronized(this) { 230 Iterator<ControlledJob> it = jobsInProgress.iterator(); 231 while(it.hasNext()) { 232 ControlledJob j = it.next(); 233 LOG.debug("Checking state of job "+j); 234 switch(j.checkState()) { 235 case SUCCESS: 236 successfulJobs.add(j); 237 it.remove(); 238 break; 239 case FAILED: 240 case DEPENDENT_FAILED: 241 failedJobs.add(j); 242 it.remove(); 243 break; 244 case READY: 245 j.submit(); 246 break; 247 case RUNNING: 248 case WAITING: 249 //Do Nothing 250 break; 251 } 252 } 253 } 254 255 if (this.runnerState != ThreadState.RUNNING && 256 this.runnerState != ThreadState.SUSPENDED) { 257 break; 258 } 259 try { 260 Thread.sleep(5000); 261 } 262 catch (Exception e) { 263 //TODO the thread was interrupted, do something!!! 264 } 265 if (this.runnerState != ThreadState.RUNNING && 266 this.runnerState != ThreadState.SUSPENDED) { 267 break; 268 } 269 } 270 }catch(Throwable t) { 271 LOG.error("Error while trying to run jobs.",t); 272 //Mark all jobs as failed because we got something bad. 273 failAllJobs(t); 274 } 275 this.runnerState = ThreadState.STOPPED; 276 } 277 278 synchronized private void failAllJobs(Throwable t) { 279 String message = "Unexpected System Error Occured: "+ 280 StringUtils.stringifyException(t); 281 Iterator<ControlledJob> it = jobsInProgress.iterator(); 282 while(it.hasNext()) { 283 ControlledJob j = it.next(); 284 try { 285 j.failJob(message); 286 } catch (IOException e) { 287 LOG.error("Error while tyring to clean up "+j.getJobName(), e); 288 } catch (InterruptedException e) { 289 LOG.error("Error while tyring to clean up "+j.getJobName(), e); 290 } finally { 291 failedJobs.add(j); 292 it.remove(); 293 } 294 } 295 } 296 297 /** 298 * Uses topological sorting algorithm for finding circular dependency 299 */ 300 private boolean isCircular(final List<ControlledJob> jobList) { 301 boolean cyclePresent = false; 302 HashSet<ControlledJob> SourceSet = new HashSet<ControlledJob>(); 303 HashMap<ControlledJob, List<ControlledJob>> processedMap = 304 new HashMap<ControlledJob, List<ControlledJob>>(); 305 for (ControlledJob n : jobList) { 306 processedMap.put(n, new ArrayList<ControlledJob>()); 307 } 308 for (ControlledJob n : jobList) { 309 if (!hasInComingEdge(n, jobList, processedMap)) { 310 SourceSet.add(n); 311 } 312 } 313 while (!SourceSet.isEmpty()) { 314 ControlledJob controlledJob = SourceSet.iterator().next(); 315 SourceSet.remove(controlledJob); 316 if (controlledJob.getDependentJobs() != null) { 317 for (int i = 0; i < controlledJob.getDependentJobs().size(); i++) { 318 ControlledJob depenControlledJob = 319 controlledJob.getDependentJobs().get(i); 320 processedMap.get(controlledJob).add(depenControlledJob); 321 if (!hasInComingEdge(controlledJob, jobList, processedMap)) { 322 SourceSet.add(depenControlledJob); 323 } 324 } 325 } 326 } 327 328 for (ControlledJob controlledJob : jobList) { 329 if (controlledJob.getDependentJobs() != null 330 && controlledJob.getDependentJobs().size() != processedMap.get( 331 controlledJob).size()) { 332 cyclePresent = true; 333 LOG.error("Job control has circular dependency for the job " 334 + controlledJob.getJobName()); 335 break; 336 } 337 } 338 return cyclePresent; 339 } 340 341 private boolean hasInComingEdge(ControlledJob controlledJob, 342 List<ControlledJob> controlledJobList, 343 HashMap<ControlledJob, List<ControlledJob>> processedMap) { 344 boolean hasIncomingEdge = false; 345 for (ControlledJob k : controlledJobList) { 346 if (k != controlledJob && k.getDependentJobs() != null 347 && !processedMap.get(k).contains(controlledJob) 348 && k.getDependentJobs().contains(controlledJob)) { 349 hasIncomingEdge = true; 350 break; 351 } 352 } 353 return hasIncomingEdge; 354 355 } 356}