Hadoop Yarn中事件驱动模型是3个重要的基础设计之一[另外两个是RPC和状态机]。

Event

Hadoop的事件接口定义Event,这里需要注意的是TYPE extends Enum<TYPE>。个人理解是Event接口定义了TYPE这个泛型,同时这个泛型继承Enum<TYPE>。所以每个实现了Event的接口,都要符合TYPE extends Enum<TYPE>

事件类型[EventType]则是简单的枚举类。

public interface Event<TYPE extends Enum<TYPE>> {
  TYPE getType();
  long getTimestamp();
  String toString();
}

Yarn定义了一个抽象类AbstractEvent并实现了Event接口,而Yarn中其他事件都是继承AbstractEvent来实现的。

public abstract class AbstractEvent<TYPE extends Enum<TYPE>> implements Event<TYPE> {
  private final TYPE type;
  private final long timestamp;

  // use this if you DON'T care about the timestamp
  public AbstractEvent(TYPE type) {
    this.type = type;
    // We're not generating a real timestamp here.  It's too expensive.
    timestamp = -1L;
  }

  // use this if you care about the timestamp
  public AbstractEvent(TYPE type, long timestamp) {
    this.type = type;
    this.timestamp = timestamp;
  }

  @Override
  public long getTimestamp() {
    return timestamp;
  }

  @Override
  public TYPE getType() {
    return type;
  }

  @Override
  public String toString() {
    return "EventType: " + getType();
  }
}

继承AbstractEvent的时候,就会传入具体的事件类型[事件类型被定义为枚举类型]。比如,

public enum AMLauncherEventType {
  LAUNCH,
  CLEANUP
}

所以说到底,Yarn中的事件都是枚举类型。


EventHandler

Yarn定义了EventHandler接口,其中接收的泛型类型要求T extends Event

public interface EventHandler<T extends Event> {
    void handle(T event);
}

比如,ApplicationMasterLauncher就实现了EventHandler并定义了handle方法

@Override
  public synchronized void handle(AMLauncherEvent appEvent) {
    AMLauncherEventType event = appEvent.getType();
    RMAppAttempt application = appEvent.getAppAttempt();
    switch (event) {
    case LAUNCH:
      launch(application);
      break;
    case CLEANUP:
      cleanup(application);
      break;
    default:
      break;
    }
  }

Dispatcher

Dispatcher的主要作用是通过不同的事件类型[EventType]找到相应的handler对事件[Event]进行处理。

Dispatcher接口中,两个基本方法register和getEventHandler。register在AsyncDispatcher使用之前就需要先注册eventType和对应的EventHandler,而getEventHandler方法主要则是把事件(event)放入eventQueue中。register底层就是一个KV Store[这里是HashMap]把Event和EentHandler对应起来,可以快速根据Event找到Handler。

public interface Dispatcher {
  public static final String DISPATCHER_EXIT_ON_ERROR_KEY =
      "yarn.dispatcher.exit-on-error";

  public static final boolean DEFAULT_DISPATCHER_EXIT_ON_ERROR = false;

  EventHandler getEventHandler();

  void register(Class<? extends Enum> eventType, EventHandler handler);
}

AsyncDispatcher作为主要的事件分发器实现了Dispatcher接口,另外两个分发器类DrainDispatcher和InlineDispatcher都继承自AsyncDispatcher。

在RM中,几乎所有的事件都通过AsyncDispatcher进行事件的派发


Take ResourceManager as an Example

ResourceManager持有一个Dispatcher对象的实例。

private Dispatcher rmDispatcher;
// Register event handler for RmNodes
nodesListManager = new NodesListManager(rmContext);
rmDispatcher.register(NodesListManagerEventType.class, nodesListManager);