事件系统的设计方法

事件系统的设计方法

Scroll Down

前言

事件功能用过段时间,像用户登入登出,充值,完成任务等非常的方便,有时间看看源码学习,设计的时候比较复杂,但完成后使用很简单,选了创建角色事件走遍流程,下面是JAVA代码

事件实体GameEvent

public abstract class GameEvent {
	/** 派发类型 */
	private int dispatchType;
	/** 事件的key {@code EventKey} */
	public String name;
	/**
	 * @param name		事件名称
	 * @param actorId	当前角色
	 */
	public GameEvent(String name) {
		this.name = name;
	}
	public GameEvent(String name, int dispatchType) {
		this.name = name;
		this.dispatchType = dispatchType;
	}

角色事件

public abstract class ActorEvent extends GameEvent {
	/** * 角色id */
	private long actorId;
	public ActorEvent(String name, long actorId) {
		super(name, DispatchType.ACTOR);
		this.actorId = actorId;
	}
	@Override
	public long getUniqueId() {
		return actorId;
	}
	public long getActorId() {
		return actorId;
	}
}

创建角色事件

/*** 创建角色事件*/
public class ActorCreateEvent extends ActorEvent {
	public ActorCreateEvent(long actorId) {
		super(EventKey.ACTOR_CREATE_EVENT, actorId);
	}
}

EventKey 事件类型

public interface EventKey {
	/** System 系统底层事件:用于在线角色定时触发的事件 @EventOnline */
	public final static String ONLINE_EVENT = "ONLINE_EVENT_%s_%s";
	/** User角色登陆事件*/
	public final static String ACTOR_LOGIN = "ACTOR_LOGIN";
	/** 创建角色事件*/
	public final static String ACTOR_CREATE_EVENT = "ACTOR_CREATE_EVENT";

消息派发类型线程池

/*** 消息派发类型线程池*/
public interface DispatchType {
	/** 运维专用线程 */
	public static int MAINTAIN = 1;
	/** 角色线程池(大部份模块用该线程),角色之间的数据修改需要通过事件方式 */
	public static int ACTOR = 2;
	/** 用户登陆  */
	public static int USER_LOGIN = 3;
	/** 帐号相关的线程 */
	public static int SETTING = 4;
	/** 节点相关的线程 */
	public static int NODE = 5;
	/** 
	 * 注意!!!!!!!!!!!
	 * 以下为每个进程的线程配置。线程数为1的,一定要谨慎修改!!切记!切记!
	 */
	public static List<ThreadInfo> getGameThreadInfo() {
		List<ThreadInfo> list = new ArrayList<>();
		list.add(ThreadInfo.valueOf("maintain", MAINTAIN, 1));
		list.add(ThreadInfo.valueOf("actor", ACTOR, Runtime.getRuntime().availableProcessors() * 2 + 1));
		list.add(ThreadInfo.valueOf("setting", SETTING, 1));
		list.add(ThreadInfo.valueOf("user_login", USER_LOGIN, 1));
		list.add(ThreadInfo.valueOf("node", NODE, 1));
		return list;
	}
}

事件派发管理

创建角色方法

创建的ActorCreateEvent继承GameEvent类,dispatchType = ACTOR,DispatchHelper派发事件

	@Override
	public TResult<Long> createActor(long uid, int platformId, int channelId, int serverId, String actorName, boolean sex, int avatarId,
			String createIP) {
		Result checkResult = this.checkActorName(actorName);
		if (checkResult.isFail()) {
			return TResult.valueOf(checkResult);
		}
		if (actorDao.getActorId(uid, serverId) > 0) {
			return TResult.valueOf(CREATE_ACTOR_FAIL);
		}
		this.checkActorName(actorName);
		int level = globalConfigService.findGlobalConfig(GlobalConfigKey.ACTOR_INIT_LEVEL).findInt();
		int initSaveDiamond = globalConfigService.findGlobalConfig(GlobalConfigKey.SAVE_DIAMOND_INIT).findInt();
		Actor actor = actorDao.createActor(uid, platformId, channelId, serverId, actorName, sex, avatarId, level, initSaveDiamond);
		int passStoryRewardTime = globalConfigService.findGlobalConfig(GlobalConfigKey.PASS_STORY_TIME).findInt();
		long expireTime = actor.getCreateTime() + passStoryRewardTime * TimeUtils.ONE_HOUR_MILLISECOND;
		actor.setPassStoryRewardExpireTime(expireTime);
		dbQueue.updateQueue(actor);
		List<RewardObject> rewardList = globalConfigService.findGlobalObject(GlobalConfigKey.ACTOR_INIT_REWARD, RewardObjectListConfig.class).getVs();
		RewardHelper.sendRewardList(actor.getActorId(), rewardList, OperationType.ACTOR_CREATE);
		VipConfig vipConfig = VipConfigService.getVipConfig(actor.getVipLevel());
		prerogativeFacade.addPrerogative(actor.getActorId(), vipConfig.getPrerogativeIdList(), OperationType.ACTOR_CREATE);
		DispatchHelper.postEvent(new ActorCreateEvent(actor.getActorId()));
		if (!ActorHelper.isRobot(actor.getActorId())) {
			GameOssLogger.newUser(uid, platformId, channelId, serverId, actor.getActorId());
			GameOssLogger.actorInfo(actor, createIP);
		}
		return TResult.sucess(actor.getActorId());
	}

触发事件方法

触发@Event(name = EventKey.ACTOR_CREATE_EVENT)的创角事件方法

@Event(name = EventKey.ACTOR_CREATE_EVENT)
	public void onActorCreateEvent(ActorEvent event) {
		long actorId = event.getActorId();
		TResult<Actor> result = this.getActor(actorId);
		if (result.isFail()) {
			return;
		}
		Actor actor = result.item;
		if (actor.getPlatformId() == PlatformId.WAN_BA || actor.getPlatformId() == PlatformId.WAN_BA_IOS) {
			ActorHelper.reportRegchar(actor.getUid());
		}
	}

还有活动多次事件触发等

@Override
	public void onEvent(GameEvent e) {
		List<ActivityOpenConfig> activityOpenConfigList = ActivityOpenConfigService.getActivityOpenConfigList(getType());
		switch (e.name) {
		case EventKey.GOD_EQUIPMENT_EVENT:
			// 神装召唤事件
			GodEquipmentEvent godEquipmentEvent = e.convert();
		case EventKey.GACHA_ACTIVITY_EVENT:
			// 抽奖事件

线程调度DispatchHelper

**调度EventContext **

@Component
public class DispatchHelper {
	@Autowired
	private DispatchContext dispatchContext;
	@Autowired
	private EventContext eventContext;
	@Autowired
	private RouterContext routerContext;
	private static ObjectReference<DispatchHelper> ref = new ObjectReference<DispatchHelper>();
	@PostConstruct
	protected void init() {
		ref.set(this);
	}
	public static void postEvent(GameEvent e) {
		ref.get().eventContext.post(e);
	}
}

事件派发环境

dispatchContext排序事件类型从小到大执行

public class EventContext {
	@Autowired
	DispatchContext dispatchContext;
	private static Logger LOGGER = LoggerFactory.getLogger(EventContext.class);
	/** OnEventListener接口事件注册列表 */
	private Map<String, List<OnEventListener>> onEventMaps = new HashMap<>();
	/** Event注解事件注册列表  */
	private Map<String, List<ASMMethod>> asmEventMaps = new HashMap<>();
	/** 事件队列 */
	private static ConcurrentLinkedQueue<GameEvent> EVENT_QUEUE = new ConcurrentLinkedQueue<>();
	/** 事件队列线程执行器 */
	private NamedScheduleExecutor eventExecutor;

	@PostConstruct
	public void initEventQueue() {
		eventExecutor = new NamedScheduleExecutor(1, "eventQueueThread");
		eventExecutor.scheduleWithFixedDelay(new Runnable() {
			@Override
			public void run() {
				try {
					for (;;) {
						GameEvent e = EVENT_QUEUE.poll();
						if (e == null) {
							return;
						}
						EventDispatch dispatch = createEventDispatch(e);
						dispatchContext.post(dispatch);
					}
				} catch (Exception e) {
					LOGGER.error("scene queue thread error:{}", e);
				}
			}
		}, 10, 10, TimeUnit.MILLISECONDS);
	}

	/**
	 * 创建事件派发包
	 * @param event
	 * @return
	 */
	private EventDispatch createEventDispatch(GameEvent event) {
		List<OnEventListener> eventListenerList = this.onEventMaps.get(event.getName());
		List<ASMMethod> asmEventList = this.asmEventMaps.get(event.getName());
		EventDispatch dispatch = new EventDispatch(event, eventListenerList, asmEventList);
		return dispatch;
	}

	public boolean register(Object obj, Class<?> clazz, Method method, String... eventNames) {
		int eventOrderId = 0;
		EventOrder eventOrder = clazz.getAnnotation(EventOrder.class);
		if (eventOrder != null) {
			eventOrderId = eventOrder.orderId();
		}
		MethodAccess methodAccess = MethodAccess.get(clazz);
		ASMMethod asmMethod = ASMMethod.valueOf(method, methodAccess, obj, eventOrderId);
		boolean register = false;
		for (String eventName : eventNames) {
			if (eventName == null || eventName.equals("")) {
				LOGGER.error("event name is null! method={}", method.getName());
				continue;
			}
			List<ASMMethod> valueList = asmEventMaps.get(eventName);
			if (valueList == null) {
				valueList = new ArrayList<>();
				asmEventMaps.put(eventName, valueList);
			}
			valueList.add(asmMethod);
			Collections.sort(valueList);
			register = true;
		}

		return register;
	}

	public boolean register(Object obj, Class<?> clazz, Method method) {
		method.setAccessible(true);
		// 普通的事件注解
		Event event = method.getAnnotation(Event.class);
		if (event != null) {
			if (event.name() == null) {
				LOGGER.error("event name is null! method={}", method.getName());
				return false;
			}
			return register(obj, clazz, method, event.name());
		}
		return false;
	}

	public void registeEventListener(OnEventListener listener) {
		Set<String> eventSet = new HashSet<>();
		listener.registerEvent(eventSet);
		if (eventSet.isEmpty()) {
			return;
		}
		for (String name : eventSet) {
			if (name == null || name.equals("")) {
				continue;
			}
			List<OnEventListener> valueList = onEventMaps.get(name);
			if (valueList == null) {
				valueList = new ArrayList<>();
				onEventMaps.put(name, valueList);
			}
			valueList.add(listener);
			Collections.sort(valueList);
		}
	}
	public void post(GameEvent event) {
		if (event != null) {
			// 事件先进队列,以防ringbuffer满环的时候被自己的消费者线程投递消息,造成死环.
			EVENT_QUEUE.add(event);
		}
	}
	public static boolean isEmpty() {
		return EVENT_QUEUE.isEmpty();
	}
	public static int getQueueSize() {
		return EVENT_QUEUE.size();
	}
}

事件监听器

添加和接收事件

public interface OnEventListener extends Comparable<OnEventListener> {
	@Override
	default int compareTo(OnEventListener o) {
		int orderId = 0;
		EventOrder eventOrder = this.getClass().getAnnotation(EventOrder.class);
		if (eventOrder != null) {
			orderId = eventOrder.orderId();
		}
		int targetOrderId = 0;
		EventOrder targetEventOrder = o.getClass().getAnnotation(EventOrder.class);
		if (targetEventOrder != null) {
			targetOrderId = targetEventOrder.orderId();
		}
		if (orderId > targetOrderId) {
			return 1;
		} else if (orderId < targetOrderId) {
			return -1;
		} else {
			return this.getClass().getName().compareTo(o.getClass().getName());
		}
	}
	/**
	 * 事件名集合
	 * @param eventSet	添加事件名 {@code EventKey}
	 */
	public void registerEvent(Set<String> eventSet);
	/**
	 * 接收事件的方法
	 * @param event
	 */
	public void onEvent(GameEvent event);
}

ASM字节码方法

public class ASMMethod implements Comparable<ASMMethod> {
	/**
	 * ASM方法访问对象
	 */
	private MethodAccess access;
	/**
	 * 方法索引
	 */
	private int index;
	/**
	 * 方法所属对象实体
	 */
	private Object instance;
	/**
	 * 执行顺序ID
	 */
	private int orderId;
	/**
	 * valueOf.
	 * @param method		方法对象
	 * @param methodAccess	asm的方法访问对象
	 * @param instance		当前方法所属类的实例
	 * @return
	 */
	public static ASMMethod valueOf(Method method, MethodAccess methodAccess, Object instance) {
		ASMMethod asmMethod = new ASMMethod();
		asmMethod.access = methodAccess;
		asmMethod.index = asmMethod.access.getIndex(method.getName(), method.getParameterTypes());
		asmMethod.instance = instance;
		return asmMethod;
	}
	public static ASMMethod valueOf(Method method, MethodAccess methodAccess, Object instance, int orderId) {
		ASMMethod asmMethod = new ASMMethod();
		asmMethod.access = methodAccess;
		asmMethod.index = asmMethod.access.getIndex(method.getName(), method.getParameterTypes());
		asmMethod.instance = instance;
		asmMethod.orderId = orderId;
		return asmMethod;
	}
	public Object invoke(Object... args) {
		return access.invoke(instance, index, args);
	}
	public int getOrderId() {
		return orderId;
	}
	@Override
	public String toString() {
		return instance.getClass().getName();
	}
	@Override
	public int compareTo(ASMMethod o) {
		if (this.orderId > o.orderId) {
			return 1;
		} else if (this.orderId < o.orderId) {
			return -1;
		} else {
			return this.instance.getClass().getName().compareTo(o.instance.getClass().getName());
		}
	}
}

事件执行线程

线程命名的执行器封装类

public class NamedScheduleExecutor {
	private ScheduledExecutorService executor;
	public NamedScheduleExecutor(int poolSize, final String name) {
		executor = Executors.newScheduledThreadPool(poolSize, new NamedThreadFactory(name));
	}
	public ScheduledExecutorService getExecutor() {
		return executor;
	}
	public Future<?> submit(Runnable run) {
		return executor.submit(run);
	}
	public ScheduledFuture<?> schedule(Runnable run, int delay, TimeUnit timeUnit) {
		return executor.schedule(run, delay, timeUnit);
	}
	public ScheduledFuture<?> scheduleAtFixedRate(Runnable run, int initialDelay, int period, TimeUnit unit) {
		return executor.scheduleAtFixedRate(run, initialDelay, period, unit);
	}
	public ScheduledFuture<?> scheduleWithFixedDelay(Runnable run, int initialDelay, int delay, TimeUnit unit) {
		return executor.scheduleWithFixedDelay(run, initialDelay, delay, unit);
	}
}