您的位置:首页 > 编程语言

hadoop 2.6 Yarn Service源代码分析

2016-02-24 15:35 204 查看
Service有四种状态,用一个枚举来表示。

/**
* Service states
*/
public enum STATE {
/** Constructed but not initialized */
NOTINITED(0, "NOTINITED"),

/** Initialized but not started or stopped */
INITED(1, "INITED"),

/** started and not stopped */
STARTED(2, "STARTED"),

/** stopped. No further state transitions are permitted */
STOPPED(3, "STOPPED");

/**
* An integer value for use in array lookup and JMX interfaces.
* Although {@link Enum#ordinal()} could do this, explicitly
* identify the numbers gives more stability guarantees over time.
*/
private final int value;

/**
* A name of the state that can be used in messages
*/
private final String statename;

private STATE(int value, String name) {
this.value = value;
this.statename = name;
}

/**
* Get the integer value of a state
* @return the numeric value of the state
*/
public int getValue() {
return value;
}

/**
* Get the name of a state
* @return the state's name
*/
@Override
public String toString() {
return statename;
}
}


Service 有以下方法:

/**
* Initialize the service.
*
* The transition MUST be from {@link STATE#NOTINITED} to {@link STATE#INITED}
* unless the operation failed and an exception was raised, in which case
* {@link #stop()} MUST be invoked and the service enter the state
* {@link STATE#STOPPED}.
* @param config the configuration of the service
* @throws RuntimeException on any failure during the operation

*/
void init(Configuration config);

/**
* Start the service.
*
* The transition MUST be from {@link STATE#INITED} to {@link STATE#STARTED}
* unless the operation failed and an exception was raised, in which case
* {@link #stop()} MUST be invoked and the service enter the state
* {@link STATE#STOPPED}.
* @throws RuntimeException on any failure during the operation
*/

void start();

/**
* Stop the service. This MUST be a no-op if the service is already
* in the {@link STATE#STOPPED} state. It SHOULD be a best-effort attempt
* to stop all parts of the service.
*
* The implementation must be designed to complete regardless of the service
* state, including the initialized/uninitialized state of all its internal
* fields.
* @throws RuntimeException on any failure during the stop operation
*/
void stop();

/**
* A version of stop() that is designed to be usable in Java7 closure
* clauses.
* Implementation classes MUST relay this directly to {@link #stop()}
* @throws IOException never
* @throws RuntimeException on any failure during the stop operation
*/
void close() throws IOException;

/**
* Register a listener to the service state change events.
* If the supplied listener is already listening to this service,
* this method is a no-op.
* @param listener a new listener
*/
void registerServiceListener(ServiceStateChangeListener listener);

/**
* Unregister a previously registered listener of the service state
* change events. No-op if the listener is already unregistered.
* @param listener the listener to unregister.
*/
void unregisterServiceListener(ServiceStateChangeListener listener);

/**
* Get the name of this service.
* @return the service name
*/
String getName();

/**
* Get the configuration of this service.
* This is normally not a clone and may be manipulated, though there are no
* guarantees as to what the consequences of such actions may be
* @return the current configuration, unless a specific implentation chooses
* otherwise.
*/
Configuration getConfig();

/**
* Get the current service state
* @return the state of the service
*/
STATE getServiceState();

/**
* Get the service start time
* @return the start time of the service. This will be zero if the service
* has not yet been started.
*/
long getStartTime();

/**
* Query to see if the service is in a specific state.
* In a multi-threaded system, the state may not hold for very long.
* @param state the expected state
* @return true if, at the time of invocation, the service was in that state.
*/
boolean isInState(STATE state);

/**
* Get the first exception raised during the service failure. If null,
* no exception was logged
* @return the failure logged during a transition to the stopped state
*/
Throwable getFailureCause();

/**
* Get the state in which the failure in {@link #getFailureCause()} occurred.
* @return the state or null if there was no failure
*/
STATE getFailureState();

/**
* Block waiting for the service to stop; uses the termination notification
* object to do so.
*
* This method will only return after all the service stop actions
* have been executed (to success or failure), or the timeout elapsed
* This method can be called before the service is inited or started; this is
* to eliminate any race condition with the service stopping before
* this event occurs.
* @param timeout timeout in milliseconds. A value of zero means "forever"
* @return true iff the service stopped in the time period
*/
boolean waitForServiceToStop(long timeout);

/**
* Get a snapshot of the lifecycle history; it is a static list
* @return a possibly empty but never null list of lifecycle events.
*/
public List<LifecycleEvent> getLifecycleHistory();

/**
* Get the blockers on a service -remote dependencies
* that are stopping the service from being <i>live</i>.
* @return a (snapshotted) map of blocker name->description values
*/
public Map<String, String> getBlockers();
}


ServiceStateModel

/**
* Implements the service state model.
*/
@Public
@Evolving
public class ServiceStateModel {

/**
* Map of all valid state transitions
* [current] [proposed1, proposed2, ...]
*/
private static final boolean[][] statemap =
{
//                uninited inited started stopped
/* uninited  */    {false, true,  false,  true},
/* inited    */    {false, true,  true,   true},
/* started   */    {false, false, true,   true},
/* stopped   */    {false, false, false,  true},
};

/**
* The state of the service
*/
private volatile Service.STATE state;

/**
* The name of the service: used in exceptions
*/
private String name;

/**
* Create the service state model in the {@link Service.STATE#NOTINITED}
* state.
*/
public ServiceStateModel(String name) {
this(name, Service.STATE.NOTINITED);
}

/**
* Create a service state model instance in the chosen state
* @param state the starting state
*/
public ServiceStateModel(String name, Service.STATE state) {
this.state = state;
this.name = name;
}

/**
* Query the service state. This is a non-blocking operation.
* @return the state
*/
public Service.STATE getState() {
return state;
}

/**
* Query that the state is in a specific state
* @param proposed proposed new state
* @return the state
*/
public boolean isInState(Service.STATE proposed) {
return state.equals(proposed);
}

/**
* Verify that that a service is in a given state.
* @param expectedState the desired state
* @throws ServiceStateException if the service state is different from
* the desired state
*/
public void ensureCurrentState(Service.STATE expectedState) {
if (state != expectedState) {
throw new ServiceStateException(name+ ": for this operation, the " +
"current service state must be "
+ expectedState
+ " instead of " + state);
}
}

/**
* Enter a state -thread safe.
*
* @param proposed proposed new state
* @return the original state
* @throws ServiceStateException if the transition is not permitted
*/
public synchronized Service.STATE enterState(Service.STATE proposed) {
checkStateTransition(name, state, proposed);
Service.STATE oldState = state;
//atomic write of the new state
state = proposed;
return oldState;
}

/**
* Check that a state tansition is valid and
* throw an exception if not
* @param name name of the service (can be null)
* @param state current state
* @param proposed proposed new state
*/
public static void checkStateTransition(String name,
Service.STATE state,
Service.STATE proposed) {
if (!isValidStateTransition(state, proposed)) {
throw new ServiceStateException(name + " cannot enter state "
+ proposed + " from state " + state);
}
}

/**
* Is a state transition valid?
* There are no checks for current==proposed
* as that is considered a non-transition.
*
* using an array kills off all branch misprediction costs, at the expense
* of cache line misses.
*
* @param current current state
* @param proposed proposed new state
* @return true if the transition to a new state is valid
*/
public static boolean isValidStateTransition(Service.STATE current,
Service.STATE proposed) {
boolean[] row = statemap[current.getValue()];
return row[proposed.getValue()];
}

/**
* return the state text as the toString() value
* @return the current state's description
*/
@Override
public String toString() {
return (name.isEmpty() ? "" : ((name) + ": "))
+ state.toString();
}

}


/**
* Interface to notify state changes of a service.
*/
@Public
@Stable
public interface ServiceStateChangeListener {

/**
* Callback to notify of a state change. The service will already
* have changed state before this callback is invoked.
*
* This operation is invoked on the thread that initiated the state change,
* while the service itself in in a sychronized section.
* <ol>
*   <li>Any long-lived operation here will prevent the service state
*   change from completing in a timely manner.</li>
*   <li>If another thread is somehow invoked from the listener, and
*   that thread invokes the methods of the service (including
*   subclass-specific methods), there is a risk of a deadlock.</li>
* </ol>
*
*
* @param service the service that has changed.
*/
void stateChanged(Service service);

}


/**
* Class to manage a list of {@link ServiceStateChangeListener} instances,
* including a notification loop that is robust against changes to the list
* during the notification process.
*/
public static class ServiceListeners {
/**
* List of state change listeners; it is final to guarantee
* that it will never be null.
*/
private final List<ServiceStateChangeListener> listeners =
new ArrayList<ServiceStateChangeListener>();

/**
* Thread-safe addition of a new listener to the end of a list.
* Attempts to re-register a listener that is already registered
* will be ignored.
* @param l listener
*/
public synchronized void add(ServiceStateChangeListener l) {
if(!listeners.contains(l)) {
listeners.add(l);
}
}

/**
* Remove any registration of a listener from the listener list.
* @param l listener
* @return true if the listener was found (and then removed)
*/
public synchronized boolean remove(ServiceStateChangeListener l) {
return listeners.remove(l);
}

/**
* Reset the listener list
*/
public synchronized void reset() {
listeners.clear();
}

/**
* Change to a new state and notify all listeners.
* This method will block until all notifications have been issued.
* It caches the list of listeners before the notification begins,
* so additions or removal of listeners will not be visible.
* @param service the service that has changed state
*/
public void notifyListeners(Service service) {
//take a very fast snapshot of the callback list
//very much like CopyOnWriteArrayList, only more minimal
ServiceStateChangeListener[] callbacks;
synchronized (this) {
callbacks = listeners.toArray(new ServiceStateChangeListener[listeners.size()]);
}
//iterate through the listeners outside the synchronized method,
//ensuring that listener registration/unregistration doesn't break anything
for (ServiceStateChangeListener l : callbacks) {
l.stateChanged(service);
}
}
}


ServiceOp0erations的其它方法如下:

public final class ServiceOperations {
private static final Log LOG = LogFactory.getLog(AbstractService.class);

private ServiceOperations() {
}

/**
* Stop a service.
* <p/>Do nothing if the service is null or not
* in a state in which it can be/needs to be stopped.
* <p/>
* The service state is checked <i>before</i> the operation begins.
* This process is <i>not</i> thread safe.
* @param service a service or null
*/
public static void stop(Service service) {
if (service != null) {
service.stop();
}
}

/**
* Stop a service; if it is null do nothing. Exceptions are caught and
* logged at warn level. (but not Throwables). This operation is intended to
* be used in cleanup operations
*
* @param service a service; may be null
* @return any exception that was caught; null if none was.
*/
public static Exception stopQuietly(Service service) {
return stopQuietly(LOG, service);
}

/**
* Stop a service; if it is null do nothing. Exceptions are caught and
* logged at warn level. (but not Throwables). This operation is intended to
* be used in cleanup operations
*
* @param log the log to warn at
* @param service a service; may be null
* @return any exception that was caught; null if none was.
* @see ServiceOperations#stopQuietly(Service)
*/
public static Exception stopQuietly(Log log, Service service) {
try {
stop(service);
} catch (Exception e) {
log.warn("When stopping the service " + service.getName()
+ " : " + e,
e);
return e;
}
return null;
}


AbstractService的实现了Service,代码如下:

/**
* This is the base implementation class for services.
*/
@Public
@Evolving
public abstract class AbstractService implements Service {

private static final Log LOG = LogFactory.getLog(AbstractService.class);

/**
* Service name.
*/
private final String name;

/** service state */
private final ServiceStateModel stateModel;

/**
* Service start time. Will be zero until the service is started.
*/
private long startTime;

/**
* The configuration. Will be null until the service is initialized.
*/
private volatile Configuration config;

/**
* List of state change listeners; it is final to ensure
* that it will never be null.
*/
private final ServiceOperations.ServiceListeners listeners
= new ServiceOperations.ServiceListeners();
/**
* Static listeners to all events across all services
*/
private static ServiceOperations.ServiceListeners globalListeners
= new ServiceOperations.ServiceListeners();

/**
* The cause of any failure -will be null.
* if a service did not stop due to a failure.
*/
private Exception failureCause;

/**
* the state in which the service was when it failed.
* Only valid when the service is stopped due to a failure
*/
private STATE failureState = null;

/**
* object used to co-ordinate {@link #waitForServiceToStop(long)}
* across threads.
*/
private final AtomicBoolean terminationNotification =
new AtomicBoolean(false);

/**
* History of lifecycle transitions
*/
private final List<LifecycleEvent> lifecycleHistory
= new ArrayList<LifecycleEvent>(5);

/**
* Map of blocking dependencies
*/
private final Map<String,String> blockerMap = new HashMap<String, String>();

private final Object stateChangeLock = new Object();

/**
* Construct the service.
* @param name service name
*/
public AbstractService(String name) {
this.name = name;
stateModel = new ServiceStateModel(name);
}

@Override
public final STATE getServiceState() {
return stateModel.getState();
}

@Override
public final synchronized Throwable getFailureCause() {
return failureCause;
}

@Override
public synchronized STATE getFailureState() {
return failureState;
}

/**
* Set the configuration for this service.
* This method is called during {@link #init(Configuration)}
* and should only be needed if for some reason a service implementation
* needs to override that initial setting -for example replacing
* it with a new subclass of {@link Configuration}
* @param conf new configuration.
*/
protected void setConfig(Configuration conf) {
this.config = conf;
}

/**
* {@inheritDoc}
* This invokes {@link #serviceInit}
* @param conf the configuration of the service. This must not be null
* @throws ServiceStateException if the configuration was null,
* the state change not permitted, or something else went wrong
*/
@Override
public void init(Configuration conf) {
if (conf == null) {
throw new ServiceStateException("Cannot initialize service "
+ getName() + ": null configuration");
}
if (isInState(STATE.INITED)) {
return;
}
synchronized (stateChangeLock) {
if (enterState(STATE.INITED) != STATE.INITED) {
setConfig(conf);
try {
serviceInit(config);
if (isInState(STATE.INITED)) {
//if the service ended up here during init,
//notify the listeners
notifyListeners();
}
} catch (Exception e) {
noteFailure(e);
ServiceOperations.stopQuietly(LOG, this);
throw ServiceStateException.convert(e);
}
}
}
}

/**
* {@inheritDoc}
* @throws ServiceStateException if the current service state does not permit
* this action
*/
@Override
public void start() {
if (isInState(STATE.STARTED)) {
return;
}
//enter the started state
synchronized (stateChangeLock) {
if (stateModel.enterState(STATE.STARTED) != STATE.STARTED) {
try {
startTime = System.currentTimeMillis();
serviceStart();
if (isInState(STATE.STARTED)) {
//if the service started (and isn't now in a later state), notify
if (LOG.isDebugEnabled()) {
LOG.debug("Service " + getName() + " is started");
}
notifyListeners();
}
} catch (Exception e) {
noteFailure(e);
ServiceOperations.stopQuietly(LOG, this);
throw ServiceStateException.convert(e);
}
}
}
}

/**
* {@inheritDoc}
*/
@Override
public void stop() {
if (isInState(STATE.STOPPED)) {
return;
}
synchronized (stateChangeLock) {
if (enterState(STATE.STOPPED) != STATE.STOPPED) {
try {
serviceStop();
} catch (Exception e) {
//stop-time exceptions are logged if they are the first one,
noteFailure(e);
throw ServiceStateException.convert(e);
} finally {
//report that the service has terminated
terminationNotification.set(true);
synchronized (terminationNotification) {
terminationNotification.notifyAll();
}
//notify anything listening for events
notifyListeners();
}
} else {
//already stopped: note it
if (LOG.isDebugEnabled()) {
LOG.debug("Ignoring re-entrant call to stop()");
}
}
}
}

/**
* Relay to {@link #stop()}
* @throws IOException
*/
@Override
public final void close() throws IOException {
stop();
}

/**
* Failure handling: record the exception
* that triggered it -if there was not one already.
* Services are free to call this themselves.
* @param exception the exception
*/
protected final void noteFailure(Exception exception) {
if (LOG.isDebugEnabled()) {
LOG.debug("noteFailure " + exception, null);
}
if (exception == null) {
//make sure failure logic doesn't itself cause problems
return;
}
//record the failure details, and log it
synchronized (this) {
if (failureCause == null) {
failureCause = exception;
failureState = getServiceState();
LOG.info("Service " + getName()
+ " failed in state " + failureState
+ "; cause: " + exception,
exception);
}
}
}

@Override
public final boolean waitForServiceToStop(long timeout) {
boolean completed = terminationNotification.get();
while (!completed) {
try {
synchronized(terminationNotification) {
terminationNotification.wait(timeout);
}
// here there has been a timeout, the object has terminated,
// or there has been a spurious wakeup (which we ignore)
completed = true;
} catch (InterruptedException e) {
// interrupted; have another look at the flag
completed = terminationNotification.get();
}
}
return terminationNotification.get();
}

/* ===================================================================== */
/* Override Points */
/* ===================================================================== */

/**
* All initialization code needed by a service.
*
* This method will only ever be called once during the lifecycle of
* a specific service instance.
*
* Implementations do not need to be synchronized as the logic
* in {@link #init(Configuration)} prevents re-entrancy.
*
* The base implementation checks to see if the subclass has created
* a new configuration instance, and if so, updates the base class value
* @param conf configuration
* @throws Exception on a failure -these will be caught,
* possibly wrapped, and wil; trigger a service stop
*/
protected void serviceInit(Configuration conf) throws Exception {
if (conf != config) {
LOG.debug("Config has been overridden during init");
setConfig(conf);
}
}

/**
* Actions called during the INITED to STARTED transition.
*
* This method will only ever be called once during the lifecycle of
* a specific service instance.
*
* Implementations do not need to be synchronized as the logic
* in {@link #start()} prevents re-entrancy.
*
* @throws Exception if needed -these will be caught,
* wrapped, and trigger a service stop
*/
protected void serviceStart() throws Exception {

}

/**
* Actions called during the transition to the STOPPED state.
*
* This method will only ever be called once during the lifecycle of
* a specific service instance.
*
* Implementations do not need to be synchronized as the logic
* in {@link #stop()} prevents re-entrancy.
*
* Implementations MUST write this to be robust against failures, including
* checks for null references -and for the first failure to not stop other
* attempts to shut down parts of the service.
*
* @throws Exception if needed -these will be caught and logged.
*/
protected void serviceStop() throws Exception {

}

@Override
public void registerServiceListener(ServiceStateChangeListener l) {
listeners.add(l);
}

@Override
public void unregisterServiceListener(ServiceStateChangeListener l) {
listeners.remove(l);
}

/**
* Register a global listener, which receives notifications
* from the state change events of all services in the JVM
* @param l listener
*/
public static void registerGlobalListener(ServiceStateChangeListener l) {
globalListeners.add(l);
}

/**
* unregister a global listener.
* @param l listener to unregister
* @return true if the listener was found (and then deleted)
*/
public static boolean unregisterGlobalListener(ServiceStateChangeListener l) {
return globalListeners.remove(l);
}

/**
* Package-scoped method for testing -resets the global listener list
*/
@VisibleForTesting
static void resetGlobalListeners() {
globalListeners.reset();
}

@Override
public String getName() {
return name;
}

@Override
public synchronized Configuration getConfig() {
return config;
}

@Override
public long getStartTime() {
return startTime;
}

/**
* Notify local and global listeners of state changes.
* Exceptions raised by listeners are NOT passed up.
*/
private void notifyListeners() {
try {
listeners.notifyListeners(this);
globalListeners.notifyListeners(this);
} catch (Throwable e) {
LOG.warn("Exception while notifying listeners of " + this + ": " + e,
e);
}
}

/**
* Add a state change event to the lifecycle history
*/
private void recordLifecycleEvent() {
LifecycleEvent event = new LifecycleEvent();
event.time = System.currentTimeMillis();
event.state = getServiceState();
lifecycleHistory.add(event);
}

@Override
public synchronized List<LifecycleEvent> getLifecycleHistory() {
return new ArrayList<LifecycleEvent>(lifecycleHistory);
}

/**
* Enter a state; record this via {@link #recordLifecycleEvent}
* and log at the info level.
* @param newState the proposed new state
* @return the original state
* it wasn't already in that state, and the state model permits state re-entrancy.
*/
private STATE enterState(STATE newState) {
assert stateModel != null : "null state in " + name + " " + this.getClass();
STATE oldState = stateModel.enterState(newState);
if (oldState != newState) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Service: " + getName() + " entered state " + getServiceState());
}
recordLifecycleEvent();
}
return oldState;
}

@Override
public final boolean isInState(Service.STATE expected) {
return stateModel.isInState(expected);
}

@Override
public String toString() {
return "Service " + name + " in state " + stateModel;
}

/**
* Put a blocker to the blocker map -replacing any
* with the same name.
* @param name blocker name
* @param details any specifics on the block. This must be non-null.
*/
protected void putBlocker(String name, String details) {
synchronized (blockerMap) {
blockerMap.put(name, details);
}
}

/**
* Remove a blocker from the blocker map -
* this is a no-op if the blocker is not present
* @param name the name of the blocker
*/
public void removeBlocker(String name) {
synchronized (blockerMap) {
blockerMap.remove(name);
}
}

@Override
public Map<String, String> getBlockers() {
synchronized (blockerMap) {
Map<String, String> map = new HashMap<String, String>(blockerMap);
return map;
}
}
}


Dispatcher的接口定义如下:

public interface Dispatcher {

// Configuration to make sure dispatcher crashes but doesn't do system-exit in
// case of errors. By default, it should be false, so that tests are not
// affected. For all daemons it should be explicitly set to true so that
// daemons can crash instead of hanging around.
public static final String DISPATCHER_EXIT_ON_ERROR_KEY =
"yarn.dispatcher.exit-on-error";

public static final boolean DEFAULT_DISPATCHER_EXIT_ON_ERROR = false;

EventHandler getEventHandler();

void register(Class<? extends Enum> eventType, EventHandler handler);

}


@SuppressWarnings("rawtypes")
@Public
@Evolving
public interface EventHandler<T extends Event> {

void handle(T event);

}


/**
* Multiplexing an event. Sending it to different handlers that
* are interested in the event.
* @param <T> the type of event these multiple handlers are interested in.
*/
static class MultiListenerHandler implements EventHandler<Event> {
List<EventHandler<Event>> listofHandlers;

public MultiListenerHandler() {
listofHandlers = new ArrayList<EventHandler<Event>>();
}

@Override
public void handle(Event event) {
for (EventHandler<Event> handler: listofHandlers) {
handler.handle(event);
}
}

void addHandler(EventHandler<Event> handler) {
listofHandlers.add(handler);
}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: