SpringBoot实现即时通讯
功能简述
- 好友管理
- 群组管理
- 聊天模式:私聊、群聊
- 消息类型:系统消息、文本、语音、图片、视频
- 会话列表、发送消息、接收消息
核心代码
package com.qiangesoft.im.core;import com.alibaba.fastjson2.JSONObject;
import com.qiangesoft.im.constant.ChatTypeEnum;
import com.qiangesoft.im.constant.ImMessageBodyTypeEnum;
import com.qiangesoft.im.service.IImGroupUserService;
import com.qiangesoft.im.util.SpringUtil;
import com.qiangesoft.im.pojo.dto.PingDTO;
import com.qiangesoft.im.pojo.vo.PongVO;
import com.qiangesoft.im.pojo.vo.ImMessageVO;
import com.qiangesoft.im.pojo.dto.ImMessageDTO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Slf4j
@ServerEndpoint("/ws/im/{userId}")
@Component
public class ImWebSocketServer {private static final ConcurrentHashMap<Long, ImWebSocketServer> WEBSOCKET_MAP = new ConcurrentHashMap<>();private Session session;@OnOpenpublic void onOpen(Session session, @PathParam("userId") Long userId) {this.session = session;if (WEBSOCKET_MAP.containsKey(userId)) {WEBSOCKET_MAP.remove(userId);WEBSOCKET_MAP.put(userId, this);} else {WEBSOCKET_MAP.put(userId, this);}log.info("User [{}] connection opened=====>", userId);PongVO pongVO = new PongVO();pongVO.setType(ImMessageBodyTypeEnum.SUCCESS.getCode());pongVO.setContent("连接成功");pongVO.setTimestamp(System.currentTimeMillis());doSendMessage(JSONObject.toJSONString(pongVO));}@OnMessagepublic void onMessage(Session session, @PathParam("userId") Long userId, String message) {log.info("User [{}] send a message, content is [{}]", userId, message);PingDTO pingDTO = null;try {pingDTO = JSONObject.parseObject(message, PingDTO.class);} catch (Exception e) {log.error("消息解析失败");e.printStackTrace();}if (pingDTO == null || !ImMessageBodyTypeEnum.PING.getCode().equals(pingDTO.getType())) {sendInValidMessage();return;}PongVO pongVO = new PongVO();pongVO.setType(ImMessageBodyTypeEnum.PONG.getCode());pongVO.setContent("已收到消息~");pongVO.setTimestamp(System.currentTimeMillis());doSendMessage(JSONObject.toJSONString(pongVO));}@OnClosepublic void onClose(Session session, @PathParam("userId") Long userId) {close(session, userId);log.info("User {} connection is closed<=====", userId);}@OnErrorpublic void onError(Session session, Throwable error) {error.printStackTrace();}public static void sendMessage(ImMessageDTO message) {String chatType = message.getChatType();if (ChatTypeEnum.GROUP.getCode().equals(chatType)) {sendGroupMessage(message);}if (ChatTypeEnum.PERSON.getCode().equals(chatType)) {sendPersonMessage(message);}}public static void offline(Long userId) {ImWebSocketServer webSocketServer = WEBSOCKET_MAP.get(userId);if (webSocketServer != null) {PongVO pongVO = new PongVO();pongVO.setType(ImMessageBodyTypeEnum.OFFLINE.getCode());pongVO.setContent("设备被挤下线");pongVO.setTimestamp(System.currentTimeMillis());webSocketServer.doSendMessage(JSONObject.toJSONString(pongVO));close(webSocketServer.session, userId);}}public static void close(Session session, Long userId) {if (WEBSOCKET_MAP.containsKey(userId)) {try {session.close();} catch (IOException e) {e.printStackTrace();}WEBSOCKET_MAP.remove(userId);}}public static Map<Long, ImWebSocketServer> getOnlineUser() {return WEBSOCKET_MAP;}private void sendInValidMessage() {PongVO pongVO = new PongVO();pongVO.setType(ImMessageBodyTypeEnum.FAIL.getCode());pongVO.setContent("无效消息");pongVO.setTimestamp(System.currentTimeMillis());doSendMessage(JSONObject.toJSONString(pongVO));}private static void sendGroupMessage(ImMessageDTO message) {Long receiverId = message.getReceiverId();IImGroupUserService groupUserService = SpringUtil.getBean(IImGroupUserService.class);List<Long> userIdList = groupUserService.listUserIdByGroupId(receiverId);MessageHandlerService messageHandlerService = SpringUtil.getBean(MessageHandlerService.class);ImMessageVO messageVO = messageHandlerService.buildVo(message);PongVO pongVO = new PongVO();pongVO.setType(ImMessageBodyTypeEnum.MESSAGE.getCode());pongVO.setContent(messageVO);pongVO.setTimestamp(System.currentTimeMillis());String messageStr = JSONObject.toJSONString(pongVO);for (Long userId : userIdList) {ImWebSocketServer webSocketServer = WEBSOCKET_MAP.get(userId);if (webSocketServer != null) {if (!userId.equals(message.getSenderId())) {webSocketServer.doSendMessage(messageStr);}}}}private static void sendPersonMessage(ImMessageDTO message) {Long receiverId = message.getReceiverId();ImWebSocketServer webSocketServer = WEBSOCKET_MAP.get(receiverId);if (webSocketServer != null) {MessageHandlerService messageHandlerService = SpringUtil.getBean(MessageHandlerService.class);ImMessageVO messageVO = messageHandlerService.buildVo(message);PongVO pongVO = new PongVO();pongVO.setType(ImMessageBodyTypeEnum.MESSAGE.getCode());pongVO.setContent(messageVO);pongVO.setTimestamp(System.currentTimeMillis());webSocketServer.doSendMessage(JSONObject.toJSONString(pongVO));}}private void doSendMessage(String message) {try {this.session.getBasicRemote().sendText(message);} catch (IOException e) {e.printStackTrace();}}
}