Event in Yarn [Yarn事件机制]
Hadoop Yarn中事件驱动模型是3个重要的基础设计之一[另外两个是RPC和状态机]。
Event
Hadoop的事件接口定义Event,这里需要注意的是TYPE extends Enum<TYPE>
。个人理解是Event接口定义了TYPE
这个泛型,同时这个泛型继承Enum<TYPE>
。所以每个实现了Event的接口,都要符合TYPE extends Enum<TYPE>
。
事件类型[EventType]则是简单的枚举类。
public interface Event<TYPE extends Enum<TYPE>> {
TYPE getType();
long getTimestamp();
String toString();
}
Yarn定义了一个抽象类AbstractEvent
并实现了Event接口,而Yarn中其他事件都是继承AbstractEvent
来实现的。
public abstract class AbstractEvent<TYPE extends Enum<TYPE>> implements Event<TYPE> {
private final TYPE type;
private final long timestamp;
// use this if you DON'T care about the timestamp
public AbstractEvent(TYPE type) {
this.type = type;
// We're not generating a real timestamp here. It's too expensive.
timestamp = -1L;
}
// use this if you care about the timestamp
public AbstractEvent(TYPE type, long timestamp) {
this.type = type;
this.timestamp = timestamp;
}
@Override
public long getTimestamp() {
return timestamp;
}
@Override
public TYPE getType() {
return type;
}
@Override
public String toString() {
return "EventType: " + getType();
}
}
继承AbstractEvent的时候,就会传入具体的事件类型[事件类型被定义为枚举类型]。比如,
public enum AMLauncherEventType {
LAUNCH,
CLEANUP
}
所以说到底,Yarn中的事件都是枚举类型。
EventHandler
Yarn定义了EventHandler接口,其中接收的泛型类型要求T extends Event
。
public interface EventHandler<T extends Event> {
void handle(T event);
}
比如,ApplicationMasterLauncher
就实现了EventHandler
并定义了handle方法
@Override
public synchronized void handle(AMLauncherEvent appEvent) {
AMLauncherEventType event = appEvent.getType();
RMAppAttempt application = appEvent.getAppAttempt();
switch (event) {
case LAUNCH:
launch(application);
break;
case CLEANUP:
cleanup(application);
break;
default:
break;
}
}
Dispatcher
Dispatcher的主要作用是通过不同的事件类型[EventType]找到相应的handler对事件[Event]进行处理。
Dispatcher接口中,两个基本方法register和getEventHandler。register在AsyncDispatcher使用之前就需要先注册eventType和对应的EventHandler,而getEventHandler方法主要则是把事件(event)放入eventQueue中。register底层就是一个KV Store[这里是HashMap]把Event和EentHandler对应起来,可以快速根据Event找到Handler。
public interface Dispatcher {
public static final String DISPATCHER_EXIT_ON_ERROR_KEY =
"yarn.dispatcher.exit-on-error";
public static final boolean DEFAULT_DISPATCHER_EXIT_ON_ERROR = false;
EventHandler getEventHandler();
void register(Class<? extends Enum> eventType, EventHandler handler);
}
AsyncDispatcher作为主要的事件分发器实现了Dispatcher接口,另外两个分发器类DrainDispatcher和InlineDispatcher都继承自AsyncDispatcher。
在RM中,几乎所有的事件都通过AsyncDispatcher进行事件的派发
Take ResourceManager as an Example
ResourceManager持有一个Dispatcher对象的实例。
private Dispatcher rmDispatcher;
// Register event handler for RmNodes
nodesListManager = new NodesListManager(rmContext);
rmDispatcher.register(NodesListManagerEventType.class, nodesListManager);