001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapreduce.lib.jobcontrol;
020
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Collection;
024import java.util.Iterator;
025import java.util.LinkedList;
026import java.util.List;
027import java.util.HashMap;
028import java.util.HashSet;
029
030import org.apache.commons.logging.Log;
031import org.apache.commons.logging.LogFactory;
032import org.apache.hadoop.classification.InterfaceAudience;
033import org.apache.hadoop.classification.InterfaceStability;
034import org.apache.hadoop.mapred.jobcontrol.Job;
035import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob.State;
036import org.apache.hadoop.util.StringUtils;
037
038/** 
039 *  This class encapsulates a set of MapReduce jobs and its dependency.
040 *   
041 *  It tracks the states of the jobs by placing them into different tables
042 *  according to their states. 
043 *  
044 *  This class provides APIs for the client app to add a job to the group 
045 *  and to get the jobs in the group in different states. When a job is 
046 *  added, an ID unique to the group is assigned to the job. 
047 *  
048 *  This class has a thread that submits jobs when they become ready, 
049 *  monitors the states of the running jobs, and updates the states of jobs
050 *  based on the state changes of their depending jobs states. The class 
051 *  provides APIs for suspending/resuming the thread, and 
052 *  for stopping the thread.
053 *  
054 */
055@InterfaceAudience.Public
056@InterfaceStability.Evolving
057public class JobControl implements Runnable {
058  private static final Log LOG = LogFactory.getLog(JobControl.class);
059
060  // The thread can be in one of the following state
061  public static enum ThreadState {RUNNING, SUSPENDED,STOPPED, STOPPING, READY};
062        
063  private ThreadState runnerState;                      // the thread state
064        
065  private LinkedList<ControlledJob> jobsInProgress = new LinkedList<ControlledJob>();
066  private LinkedList<ControlledJob> successfulJobs = new LinkedList<ControlledJob>();
067  private LinkedList<ControlledJob> failedJobs = new LinkedList<ControlledJob>();
068        
069  private long nextJobID;
070  private String groupName;
071        
072  /** 
073   * Construct a job control for a group of jobs.
074   * @param groupName a name identifying this group
075   */
076  public JobControl(String groupName) {
077    this.nextJobID = -1;
078    this.groupName = groupName;
079    this.runnerState = ThreadState.READY;
080  }
081        
082  private static List<ControlledJob> toList(
083                   LinkedList<ControlledJob> jobs) {
084    ArrayList<ControlledJob> retv = new ArrayList<ControlledJob>();
085    for (ControlledJob job : jobs) {
086      retv.add(job);
087    }
088    return retv;
089  }
090        
091  synchronized private List<ControlledJob> getJobsIn(State state) {
092    LinkedList<ControlledJob> l = new LinkedList<ControlledJob>();
093    for(ControlledJob j: jobsInProgress) {
094      if(j.getJobState() == state) {
095        l.add(j);
096      }
097    }
098    return l;
099  }
100  
101  /**
102   * @return the jobs in the waiting state
103   */
104  public List<ControlledJob> getWaitingJobList() {
105    return getJobsIn(State.WAITING);
106  }
107        
108  /**
109   * @return the jobs in the running state
110   */
111  public List<ControlledJob> getRunningJobList() {
112    return getJobsIn(State.RUNNING);
113  }
114        
115  /**
116   * @return the jobs in the ready state
117   */
118  public List<ControlledJob> getReadyJobsList() {
119    return getJobsIn(State.READY);
120  }
121        
122  /**
123   * @return the jobs in the success state
124   */
125  synchronized public List<ControlledJob> getSuccessfulJobList() {
126    return toList(this.successfulJobs);
127  }
128        
129  synchronized public List<ControlledJob> getFailedJobList() {
130    return toList(this.failedJobs);
131  }
132        
133  private String getNextJobID() {
134    nextJobID += 1;
135    return this.groupName + this.nextJobID;
136  }
137
138  /**
139   * Add a new controlled job.
140   * @param aJob the new controlled job
141   */
142  synchronized public String addJob(ControlledJob aJob) {
143    String id = this.getNextJobID();
144    aJob.setJobID(id);
145    aJob.setJobState(State.WAITING);
146    jobsInProgress.add(aJob);
147    return id;  
148  }
149
150  /**
151   * Add a new job.
152   * @param aJob the new job
153   */
154  synchronized public String addJob(Job aJob) {
155    return addJob((ControlledJob) aJob);
156  }
157
158  /**
159   * Add a collection of jobs
160   * 
161   * @param jobs
162   */
163  public void addJobCollection(Collection<ControlledJob> jobs) {
164    for (ControlledJob job : jobs) {
165      addJob(job);
166    }
167  }
168        
169  /**
170   * @return the thread state
171   */
172  public ThreadState getThreadState() {
173    return this.runnerState;
174  }
175        
176  /**
177   * set the thread state to STOPPING so that the 
178   * thread will stop when it wakes up.
179   */
180  public void stop() {
181    this.runnerState = ThreadState.STOPPING;
182  }
183        
184  /**
185   * suspend the running thread
186   */
187  public void suspend () {
188    if (this.runnerState == ThreadState.RUNNING) {
189      this.runnerState = ThreadState.SUSPENDED;
190    }
191  }
192        
193  /**
194   * resume the suspended thread
195   */
196  public void resume () {
197    if (this.runnerState == ThreadState.SUSPENDED) {
198      this.runnerState = ThreadState.RUNNING;
199    }
200  }
201        
202  synchronized public boolean allFinished() {
203    return jobsInProgress.isEmpty();
204  }
205        
206  /**
207   *  The main loop for the thread.
208   *  The loop does the following:
209   *    Check the states of the running jobs
210   *    Update the states of waiting jobs
211   *    Submit the jobs in ready state
212   */
213  public void run() {
214    if (isCircular(jobsInProgress)) {
215      throw new IllegalArgumentException("job control has circular dependency");
216    }
217    try {
218      this.runnerState = ThreadState.RUNNING;
219      while (true) {
220        while (this.runnerState == ThreadState.SUSPENDED) {
221          try {
222            Thread.sleep(5000);
223          }
224          catch (Exception e) {
225            //TODO the thread was interrupted, do something!!!
226          }
227        }
228        
229        synchronized(this) {
230          Iterator<ControlledJob> it = jobsInProgress.iterator();
231          while(it.hasNext()) {
232            ControlledJob j = it.next();
233            LOG.debug("Checking state of job "+j);
234            switch(j.checkState()) {
235            case SUCCESS:
236              successfulJobs.add(j);
237              it.remove();
238              break;
239            case FAILED:
240            case DEPENDENT_FAILED:
241              failedJobs.add(j);
242              it.remove();
243              break;
244            case READY:
245              j.submit();
246              break;
247            case RUNNING:
248            case WAITING:
249              //Do Nothing
250              break;
251            }
252          }
253        }
254        
255        if (this.runnerState != ThreadState.RUNNING && 
256            this.runnerState != ThreadState.SUSPENDED) {
257          break;
258        }
259        try {
260          Thread.sleep(5000);
261        }
262        catch (Exception e) {
263          //TODO the thread was interrupted, do something!!!
264        }
265        if (this.runnerState != ThreadState.RUNNING && 
266            this.runnerState != ThreadState.SUSPENDED) {
267          break;
268        }
269      }
270    }catch(Throwable t) {
271      LOG.error("Error while trying to run jobs.",t);
272      //Mark all jobs as failed because we got something bad.
273      failAllJobs(t);
274    }
275    this.runnerState = ThreadState.STOPPED;
276  }
277
278  synchronized private void failAllJobs(Throwable t) {
279    String message = "Unexpected System Error Occured: "+
280    StringUtils.stringifyException(t);
281    Iterator<ControlledJob> it = jobsInProgress.iterator();
282    while(it.hasNext()) {
283      ControlledJob j = it.next();
284      try {
285        j.failJob(message);
286      } catch (IOException e) {
287        LOG.error("Error while tyring to clean up "+j.getJobName(), e);
288      } catch (InterruptedException e) {
289        LOG.error("Error while tyring to clean up "+j.getJobName(), e);
290      } finally {
291        failedJobs.add(j);
292        it.remove();
293      }
294    }
295  }
296
297 /**
298   * Uses topological sorting algorithm for finding circular dependency
299   */
300  private boolean isCircular(final List<ControlledJob> jobList) {
301    boolean cyclePresent = false;
302    HashSet<ControlledJob> SourceSet = new HashSet<ControlledJob>();
303    HashMap<ControlledJob, List<ControlledJob>> processedMap =
304        new HashMap<ControlledJob, List<ControlledJob>>();
305    for (ControlledJob n : jobList) {
306      processedMap.put(n, new ArrayList<ControlledJob>());
307    }
308    for (ControlledJob n : jobList) {
309      if (!hasInComingEdge(n, jobList, processedMap)) {
310        SourceSet.add(n);
311      }
312    }
313    while (!SourceSet.isEmpty()) {
314      ControlledJob controlledJob = SourceSet.iterator().next();
315      SourceSet.remove(controlledJob);
316      if (controlledJob.getDependentJobs() != null) {
317        for (int i = 0; i < controlledJob.getDependentJobs().size(); i++) {
318          ControlledJob depenControlledJob =
319              controlledJob.getDependentJobs().get(i);
320          processedMap.get(controlledJob).add(depenControlledJob);
321          if (!hasInComingEdge(controlledJob, jobList, processedMap)) {
322            SourceSet.add(depenControlledJob);
323          }
324        }
325      }
326    }
327
328    for (ControlledJob controlledJob : jobList) {
329      if (controlledJob.getDependentJobs() != null
330          && controlledJob.getDependentJobs().size() != processedMap.get(
331              controlledJob).size()) {
332        cyclePresent = true;
333        LOG.error("Job control has circular dependency for the  job "
334            + controlledJob.getJobName());
335        break;
336      }
337    }
338    return cyclePresent;
339  }
340
341  private boolean hasInComingEdge(ControlledJob controlledJob,
342      List<ControlledJob> controlledJobList,
343      HashMap<ControlledJob, List<ControlledJob>> processedMap) {
344    boolean hasIncomingEdge = false;
345    for (ControlledJob k : controlledJobList) {
346      if (k != controlledJob && k.getDependentJobs() != null
347          && !processedMap.get(k).contains(controlledJob)
348          && k.getDependentJobs().contains(controlledJob)) {
349        hasIncomingEdge = true;
350        break;
351      }
352    }
353    return hasIncomingEdge;
354
355  }
356}