package com.kdgcsoft.sc.rdc.messenger;

import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.io.FileUtil;
import cn.hutool.core.util.RandomUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.parser.Feature;
import com.alibaba.fastjson.parser.ParserConfig;
import com.kdgcsoft.sc.rdc.messenger.Messenger;
import com.kdgcsoft.sc.rdc.messenger.exception.MessengerException;
import com.kdgcsoft.sc.rdc.messenger.helper.Cnst;
import com.kdgcsoft.sc.rdc.messenger.helper.DirHelper;
import com.kdgcsoft.sc.rdc.messenger.helper.FileWriter;
import com.kdgcsoft.sc.rdc.messenger.helper.FileWriterHelper;
import com.kdgcsoft.sc.rdc.messenger.interfaces.IMessengerComponent;
import com.kdgcsoft.sc.rdc.messenger.message.CmdMessage;
import com.kdgcsoft.sc.rdc.messenger.message.CmdMessageHandlerDef;
import com.kdgcsoft.sc.rdc.messenger.message.DefaultCMD;
import com.kdgcsoft.sc.rdc.messenger.message.MessageType;
import com.kdgcsoft.sc.rdc.messenger.message.MsgStatus;
import com.kdgcsoft.sc.rdc.messenger.message.body.FileBlockBody;
import com.kdgcsoft.sc.rdc.messenger.message.body.FileBlockResendBody;
import com.kdgcsoft.sc.rdc.messenger.message.body.FileBody;
import com.kdgcsoft.sc.rdc.messenger.message.body.FileHeaderBody;
import com.kdgcsoft.sc.rdc.messenger.message.handler.AbstractMessageHandler;
import com.kdgcsoft.sc.rdc.messenger.message.listener.FileQueueListener;
import com.kdgcsoft.sc.rdc.messenger.message.listener.MsgQueueListener;
import com.kdgcsoft.sc.rdc.messenger.message.listener.RpcQueueListener;
import com.kdgcsoft.sc.rdc.messenger.model.MessengerNode;
import com.kdgcsoft.sc.rdc.messenger.model.RPCMessageResponse;
import com.kdgcsoft.sc.rdc.messenger.sender.DefaultMessageSender;
import com.kdgcsoft.sc.rdc.messenger.sender.FileMessageSender;
import com.kdgcsoft.sc.rdc.messenger.sender.IMessageSender;
import com.kdgcsoft.sc.rdc.messenger.sender.MsgMessageSender;
import com.kdgcsoft.sc.rdc.messenger.sender.RPCMessageSender;
import com.kdgcsoft.sc.rdc.messenger.store.DBStoreServer;
import com.kdgcsoft.sc.rdc.messenger.store.entity.DbReceiveMessage;
import com.kdgcsoft.sc.rdc.messenger.thread.ThreadExecutor;
import com.kdgcsoft.sc.rdc.messenger.timertask.FileCleanTask;
import com.kdgcsoft.sc.rdc.messenger.timertask.FileReRequestTask;
import com.kdgcsoft.sc.rdc.messenger.timertask.MsgCleanTask;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionNameStrategy;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.util.ObjectUtils;

/* loaded from: input_file:com/kdgcsoft/sc/rdc/messenger/Messenger.class */
public enum Messenger {
    INST;

    private int retryCount;
    private MessengerNode connectNode;
    private MessengerNode centerNode;
    private String dbUserName;
    private String dbPassword;
    private String baseDir;
    private boolean msgIdempotent;
    private static final Log log = LogFactory.get();
    private static Map<String, CachingConnectionFactory> connectMap = new ConcurrentHashMap();
    private static Map<String, RabbitTemplate> templateMap = new ConcurrentHashMap();
    private static List<SimpleMessageListenerContainer> selflistener = new ArrayList();
    private static Map<String, CmdMessageHandlerDef> CMDMap = new HashMap();
    private volatile MessengerStatus status = MessengerStatus.NOT_INIT;
    private int fileMaxSaveDays = 10;

    Messenger() {
    }

    private void loadProperties() {
        TimeZone.setDefault(TimeZone.getTimeZone("GMT+8"));
    }

    private static void registerDefaultCMDMessageHandler(DefaultCMD defaultCMD, AbstractMessageHandler abstractMessageHandler) {
        CMDMap.put(defaultCMD.name(), new CmdMessageHandlerDef(defaultCMD.name(), defaultCMD.getMessageType(), abstractMessageHandler));
    }

    public MessageType getMessageType(String str) {
        CmdMessageHandlerDef cmdMessageHandlerDef = CMDMap.get(str);
        if (cmdMessageHandlerDef != null) {
            return cmdMessageHandlerDef.getMessageType();
        }
        log.error("{}指令未注册", new Object[]{str});
        return null;
    }

    public AbstractMessageHandler getMessageHandler(String str) {
        CmdMessageHandlerDef cmdMessageHandlerDef = CMDMap.get(str);
        if (cmdMessageHandlerDef != null) {
            return cmdMessageHandlerDef.getMessageHandler();
        }
        log.error("{}指令未注册", new Object[]{str});
        return null;
    }

    public Messenger init(MessengerSetting messengerSetting) {
        this.msgIdempotent = messengerSetting.isMsgIdempotent();
        Log log2 = log;
        Object[] objArr = new Object[1];
        objArr[0] = this.msgIdempotent ? "是" : "否";
        log2.info("使用消息幂等功能：{}", objArr);
        this.connectNode = messengerSetting.getMessengerNode();
        this.centerNode = messengerSetting.getCenterNode();
        this.baseDir = messengerSetting.getBaseDir();
        this.retryCount = messengerSetting.getRetryCount();
        this.dbUserName = messengerSetting.getDbUserName() == null ? "" : messengerSetting.getDbUserName();
        this.dbPassword = messengerSetting.getDbPassword() == null ? "" : messengerSetting.getDbPassword();
        this.fileMaxSaveDays = messengerSetting.getFileMaxSaveDays();
        for (Map.Entry<String, CmdMessageHandlerDef> entry : messengerSetting.getAllMessageHandler().entrySet()) {
            if (CMDMap.containsKey(entry.getKey())) {
                log.warn("{}指令已经存在", new Object[]{entry.getKey()});
            } else {
                CMDMap.put(entry.getKey(), entry.getValue());
            }
        }
        for (IMessengerComponent iMessengerComponent : messengerSetting.getComponentList()) {
            List<CmdMessageHandlerDef> cmdHandler = iMessengerComponent.getCmdHandler();
            if (CollUtil.isNotEmpty(cmdHandler)) {
                for (CmdMessageHandlerDef cmdMessageHandlerDef : cmdHandler) {
                    if (CMDMap.containsKey(cmdMessageHandlerDef.getCommand())) {
                        log.warn("{}指令已经存在", new Object[]{cmdMessageHandlerDef.getCommand()});
                    } else {
                        log.debug("{}注册指令[{}]-[{}]-[{}]", new Object[]{iMessengerComponent.getClass().getSimpleName(), cmdMessageHandlerDef.getMessageType().name(), cmdMessageHandlerDef.getCommand(), cmdMessageHandlerDef.getMessageHandler()});
                        CMDMap.put(cmdMessageHandlerDef.getCommand(), cmdMessageHandlerDef);
                    }
                }
            }
        }
        log.info("初始化存储目录：{}", new Object[]{this.baseDir});
        DirHelper.INST.init(this.baseDir);
        this.status = MessengerStatus.INITED;
        return this;
    }

    public void start() throws MessengerException {
        switch (this.status) {
            case NOT_INIT:
                log.error("还未初始化,请使用init()方法进行初始化", new Object[0]);
                return;
            case STARTING:
                log.error("Messenger正在启动中,不能执行start操作", new Object[0]);
                return;
            case STARTED:
                log.info("Messenger服务正在运行,请直接使用", new Object[0]);
                return;
            case STOPPING:
                log.error("Messenger正在停止中,不能执行start操作", new Object[0]);
                return;
            default:
                log.info("Messenger...启动中...", new Object[0]);
                this.status = MessengerStatus.STARTING;
                ParserConfig.getGlobalInstance().setAutoTypeSupport(true);
                try {
                    log.debug("加载基础配置信息", new Object[0]);
                    loadProperties();
                    log.debug("初始化本地数据库相关资源", new Object[0]);
                    initStore();
                    loadReceiveFiles();
                    log.debug("初始化连接", new Object[0]);
                    getConnectionFactory(this.connectNode);
                    log.debug("初始化定时任务", new Object[0]);
                    intTimerTask();
                    this.status = MessengerStatus.STARTED;
                    log.debug("创建自身MQ队列监听", new Object[0]);
                    regeditListener(this.connectNode);
                    log.info("Messenger...启动完成.", new Object[0]);
                    return;
                } catch (Exception e) {
                    this.status = MessengerStatus.STOPPED;
                    log.error(e, "Messenger...启动失败,服务已停止.", new Object[0]);
                    throw new MessengerException("Messenger...启动失败,服务已停止.", e);
                }
        }
    }

    private void loadReceiveFiles() throws IOException {
        log.debug("加载未完成的文件列表", new Object[0]);
        List<DbReceiveMessage> notComplateFileMessage = DBStoreServer.INST.getNotComplateFileMessage();
        log.debug("未完成文件数量:{}", new Object[]{Integer.valueOf(notComplateFileMessage.size())});
        Iterator<DbReceiveMessage> it = notComplateFileMessage.iterator();
        while (it.hasNext()) {
            FileWriterHelper.INST.addFileWriter(it.next().getCmdMessage());
        }
    }

    private void regeditListener(MessengerNode messengerNode) throws MessengerException {
        CachingConnectionFactory connectionFactory = getConnectionFactory(messengerNode);
        String buildQueueName = Cnst.buildQueueName(messengerNode, MessageType.FILE);
        String buildQueueName2 = Cnst.buildQueueName(messengerNode, MessageType.MSG);
        String buildQueueName3 = Cnst.buildQueueName(messengerNode, MessageType.RPC);
        try {
            SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
            simpleMessageListenerContainer.setQueueNames(new String[]{buildQueueName});
            simpleMessageListenerContainer.setExposeListenerChannel(true);
            simpleMessageListenerContainer.setPrefetchCount(5);
            simpleMessageListenerContainer.setConcurrentConsumers(10);
            simpleMessageListenerContainer.setMessageListener(new FileQueueListener());
            simpleMessageListenerContainer.start();
            selflistener.add(simpleMessageListenerContainer);
            SimpleMessageListenerContainer simpleMessageListenerContainer2 = new SimpleMessageListenerContainer(connectionFactory);
            simpleMessageListenerContainer2.setQueueNames(new String[]{buildQueueName2});
            simpleMessageListenerContainer2.setExposeListenerChannel(true);
            if ("LOG_CENTER".equalsIgnoreCase(messengerNode.getNodeid())) {
                simpleMessageListenerContainer2.setPrefetchCount(20);
                simpleMessageListenerContainer2.setConcurrentConsumers(10);
            } else {
                simpleMessageListenerContainer2.setPrefetchCount(10);
                simpleMessageListenerContainer2.setConcurrentConsumers(10);
            }
            simpleMessageListenerContainer2.setMessageListener(new MsgQueueListener());
            simpleMessageListenerContainer2.start();
            selflistener.add(simpleMessageListenerContainer2);
            SimpleMessageListenerContainer simpleMessageListenerContainer3 = new SimpleMessageListenerContainer(connectionFactory);
            simpleMessageListenerContainer3.setQueueNames(new String[]{buildQueueName3});
            simpleMessageListenerContainer3.setExposeListenerChannel(true);
            simpleMessageListenerContainer3.setPrefetchCount(1);
            simpleMessageListenerContainer3.setConcurrentConsumers(1);
            simpleMessageListenerContainer3.setMessageListener(new MessageListenerAdapter(new RpcQueueListener()));
            simpleMessageListenerContainer3.start();
            selflistener.add(simpleMessageListenerContainer3);
        } catch (Exception e) {
            log.error(e, "创建队列监听出错,{},相关信息:\r\n[{}],[{}],[{}],[{}]", new Object[]{e.getMessage(), buildQueueName, buildQueueName2, buildQueueName3});
            throw new MessengerException("创建队列监听出错", e);
        }
    }

    private void intTimerTask() {
        log.debug("添加定时任务FileReRequestTask,延迟{},频率{},单位{}", new Object[]{5, 30, TimeUnit.MINUTES.name()});
        ThreadExecutor.excuteAtFixedRate(new FileReRequestTask(), 5L, 30L, TimeUnit.MINUTES);
        log.debug("添加定时任务DBCleanTask,延迟{},频率{},单位{}", new Object[]{5, 60, TimeUnit.MINUTES.name()});
        ThreadExecutor.excuteAtFixedRate(new MsgCleanTask(), 5L, 60L, TimeUnit.MINUTES);
        log.debug("添加定时任务FileCleanTask,延迟{},频率{},单位{}", new Object[]{1, 8, TimeUnit.HOURS.name()});
        ThreadExecutor.excuteAtFixedRate(new FileCleanTask(), 1L, 8L, TimeUnit.HOURS);
    }

    private void initStore() throws MessengerException {
        DBStoreServer.INST.init(this.dbUserName, this.dbPassword, this.connectNode.getNodeid());
    }

    private CachingConnectionFactory getConnectionFactory(MessengerNode messengerNode) {
        return connectMap.containsKey(messengerNode.getNodeid()) ? connectMap.get(messengerNode.getNodeid()) : buildConnectionFactory(messengerNode);
    }

    private CachingConnectionFactory buildConnectionFactory(final MessengerNode messengerNode) {
        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
        cachingConnectionFactory.setBeanName(messengerNode.getNodeid());
        cachingConnectionFactory.setAddresses(messengerNode.getAddress());
        cachingConnectionFactory.setUsername(messengerNode.getUsername());
        cachingConnectionFactory.setPassword(messengerNode.getPassword());
        cachingConnectionFactory.setVirtualHost(messengerNode.getVirtualHost());
        cachingConnectionFactory.setConnectionNameStrategy(new ConnectionNameStrategy() { // from class: com.kdgcsoft.sc.rdc.messenger.Messenger.1
            public String obtainNewConnectionName(ConnectionFactory connectionFactory) {
                return "[" + messengerNode.getNodeid() + "]-" + ObjectUtils.getIdentityHexString(connectionFactory);
            }
        });
        cachingConnectionFactory.setPublisherConfirms(true);
        cachingConnectionFactory.setPublisherReturns(true);
        cachingConnectionFactory.setChannelCacheSize(100);
        log.debug("创建MQ连接工厂:[{}][{}]", new Object[]{messengerNode.getNodeid(), messengerNode.getAddress()});
        connectMap.put(messengerNode.getNodeid(), cachingConnectionFactory);
        declareQueues(messengerNode);
        return cachingConnectionFactory;
    }

    private void declareQueues(MessengerNode messengerNode) {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(getConnectionFactory(messengerNode));
        Queue queue = new Queue(Cnst.buildQueueName(messengerNode, MessageType.FILE));
        Queue queue2 = new Queue(Cnst.buildQueueName(messengerNode, MessageType.MSG));
        HashMap hashMap = new HashMap();
        hashMap.put("x-message-ttl", Cnst.DEFAULT_RPC_QUEUE_TIMEOUT);
        Queue queue3 = new Queue(Cnst.buildQueueName(messengerNode, MessageType.RPC), true, false, false, hashMap);
        log.debug("创建节点队列列表:{}", new Object[]{messengerNode.toString()});
        boolean z = false;
        while (!z) {
            try {
                rabbitAdmin.declareQueue(queue2);
                rabbitAdmin.declareQueue(queue);
                rabbitAdmin.declareQueue(queue3);
                z = true;
            } catch (Exception e) {
                log.error("节点队列声明出错:{},[{}s]后重试...", new Object[]{e.getMessage(), Long.valueOf(Cnst.CONNECT_ERROR_RETRY_DELAY.longValue() / 1000)});
                try {
                    Thread.sleep(Cnst.CONNECT_ERROR_RETRY_DELAY.longValue());
                } catch (InterruptedException e2) {
                    log.error(e2);
                    Thread.currentThread().interrupt();
                    return;
                }
            }
        }
    }

    private RabbitTemplate buildTemplate(MessengerNode messengerNode, boolean z) {
        String str = messengerNode.getNodeid() + "." + (z ? Cnst.RPC_TEMPLATE : Cnst.AMQP_TEMPLATE);
        RabbitTemplate rabbitTemplate = new RabbitTemplate(getConnectionFactory(messengerNode));
        rabbitTemplate.setEncoding("UTF-8");
        rabbitTemplate.setMandatory(true);
        if (z) {
            rabbitTemplate.setRoutingKey(Cnst.buildQueueName(messengerNode, MessageType.RPC));
            rabbitTemplate.setUseTemporaryReplyQueues(true);
        } else {
            rabbitTemplate.setReturnCallback((message, i, str2, str3, str4) -> {
                CmdMessage cmdMessage = (CmdMessage) JSON.parseObject(message.getBody(), CmdMessage.class, new Feature[0]);
                DBStoreServer.INST.updateSendMsgStatus(cmdMessage, MsgStatus.ERROR);
                log.error("{}消息发送失败:[{}][{}]", new Object[]{cmdMessage.getBaseInfo(), str4, str2});
            });
            rabbitTemplate.setConfirmCallback((correlationData, z2, str5) -> {
                if (z2 || correlationData == null) {
                    return;
                }
                log.error("消息发送失败:[{}] [{}]更新状态为{},等待定时任务重发", new Object[]{correlationData.getId(), str5, MsgStatus.ERROR});
                DBStoreServer.INST.updateSendMsgStatus(correlationData.getId(), MsgStatus.ERROR);
            });
        }
        log.debug("创建RabbitTemplate:[{}]", new Object[]{str});
        templateMap.put(str, rabbitTemplate);
        return rabbitTemplate;
    }

    private RabbitTemplate getTemplate(MessengerNode messengerNode, boolean z) throws MessengerException {
        String str = messengerNode.getNodeid() + "." + (z ? Cnst.RPC_TEMPLATE : Cnst.AMQP_TEMPLATE);
        return templateMap.containsKey(str) ? templateMap.get(str) : buildTemplate(messengerNode, z);
    }

    public void sendMsg(CmdMessage cmdMessage) {
        if (this.status != MessengerStatus.STARTED) {
            log.error("Messenger服务当前状态不可用{}", new Object[]{this.status.name()});
            return;
        }
        fillCMDMessageQueueNameAndMessageType(cmdMessage);
        IMessageSender iMessageSender = null;
        MessageType messageType = getMessageType(cmdMessage.getCommand());
        if (messageType != null) {
            switch (messageType) {
                case FILE:
                    iMessageSender = FileMessageSender.INST;
                    break;
                case MSG:
                    iMessageSender = MsgMessageSender.INST;
                    break;
                case RPC:
                    log.error("sendMsg方法不支持发送RPC消息,请换用sendRPCMsg方法", new Object[0]);
                    return;
                default:
                    iMessageSender = DefaultMessageSender.INST;
                    break;
            }
        } else {
            log.error("指令:{}未绑定MessageType", new Object[]{cmdMessage.getCommand()});
        }
        if (iMessageSender == null) {
            log.warn("{}没有找到对应的发送处理类!请检查指令绑定信息,更新本地状态为:{}", new Object[]{cmdMessage.getBaseInfo(), MsgStatus.UNDEAL.toString()});
            DBStoreServer.INST.updateSendMsgStatus(cmdMessage, MsgStatus.UNDEAL);
            return;
        }
        if (messageType == MessageType.FILE && cmdMessage.isStore()) {
            DBStoreServer.INST.saveSendMsg(cmdMessage);
        }
        try {
            MsgStatus sendMsg = iMessageSender.sendMsg(getTemplate(cmdMessage.getMessageTrace().getReceiver(), false), cmdMessage);
            if (cmdMessage.isStore()) {
                log.debug("{}发送完成!,更新本地状态为:{}", new Object[]{cmdMessage.getBaseInfo(), sendMsg.name()});
                DBStoreServer.INST.updateSendMsgStatus(cmdMessage, sendMsg);
            }
        } catch (Exception e) {
            if (cmdMessage.isStore()) {
                log.error(e, "{}消息发送到MQ失败!,更新本地状态为:{}", new Object[]{cmdMessage.getBaseInfo(), MsgStatus.ERROR.toString()});
                DBStoreServer.INST.updateSendMsgStatus(cmdMessage, MsgStatus.ERROR);
            }
        }
    }

    public RPCMessageResponse sendRPCMsg(CmdMessage cmdMessage) {
        if (this.status != MessengerStatus.STARTED) {
            log.error("Messenger服务当前状态不可用{}", new Object[]{this.status.name()});
            return null;
        }
        fillCMDMessageQueueNameAndMessageType(cmdMessage);
        RPCMessageSender rPCMessageSender = RPCMessageSender.INST;
        log.debug("{}开始发送RPC信息", new Object[]{cmdMessage.getBaseInfo()});
        try {
            return rPCMessageSender.sendMsg(getTemplate(cmdMessage.getMessageTrace().getReceiver(), true), cmdMessage);
        } catch (Exception e) {
            log.error(e, "{}发送RPC信息失败!", new Object[]{cmdMessage.getBaseInfo()});
            return new RPCMessageResponse("远程调用失败！", e);
        }
    }

    CmdMessage fillCMDMessageQueueNameAndMessageType(CmdMessage cmdMessage) {
        MessageType messageType = getMessageType(cmdMessage.getCommand());
        cmdMessage.setQueueName(Cnst.buildQueueName(cmdMessage.getMessageTrace().getReceiver(), messageType));
        cmdMessage.setMessageType(messageType);
        return cmdMessage;
    }

    private void addShutdownHook() {
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            log.info("程序退出,关闭服务", new Object[0]);
            INST.shutdown();
        }));
    }

    public boolean shutdown() {
        switch (AnonymousClass2.$SwitchMap$com$kdgcsoft$sc$rdc$messenger$MessengerStatus[this.status.ordinal()]) {
            case 4:
                log.error("Messenger服务正在停止中,不可重复调用shutdown", new Object[0]);
                return false;
            case Cnst.DEFAULT_RETRY_COUNT /* 5 */:
                log.info("Messenger服务已停止", new Object[0]);
                return true;
            default:
                log.info("关闭队列监听...开始", new Object[0]);
                Iterator<SimpleMessageListenerContainer> it = selflistener.iterator();
                while (it.hasNext()) {
                    it.next().shutdown();
                }
                boolean z = false;
                while (!z) {
                    Iterator<SimpleMessageListenerContainer> it2 = selflistener.iterator();
                    while (true) {
                        if (!it2.hasNext()) {
                            break;
                        }
                        if (it2.next().isRunning()) {
                            z = false;
                        } else {
                            z = true;
                        }
                    }
                }
                log.info("关闭队列监听...结束", new Object[0]);
                log.info("关闭连接工厂...开始", new Object[0]);
                Iterator<CachingConnectionFactory> it3 = connectMap.values().iterator();
                while (it3.hasNext()) {
                    it3.next().destroy();
                }
                log.info("关闭连接工厂...结束", new Object[0]);
                log.info("关闭Messenger...开始", new Object[0]);
                this.status = MessengerStatus.STOPPING;
                log.info("关闭所有的线程池...开始", new Object[0]);
                ThreadExecutor.shutdown();
                log.info("关闭所有的线程池...结束", new Object[0]);
                log.info("关闭数据库连接...开始", new Object[0]);
                DBStoreServer.INST.shutdown();
                log.info("关闭数据库连接...结束", new Object[0]);
                this.status = MessengerStatus.STOPPED;
                log.info("关闭Messenger...结束", new Object[0]);
                return true;
        }
    }

    public MessengerNode getConnectNode() {
        return this.connectNode;
    }

    public MessengerNode getCenterNode() {
        return this.centerNode;
    }

    public int getRetryCount() {
        return this.retryCount;
    }

    public String getBaseDir() {
        return this.baseDir;
    }

    public String getDbUserName() {
        return this.dbUserName;
    }

    public String getDbPassword() {
        return this.dbPassword;
    }

    public int getFileMaxSaveDays() {
        return this.fileMaxSaveDays;
    }

    public void setFileMaxSaveDays(int i) {
        this.fileMaxSaveDays = i;
    }

    public boolean isMsgIdempotent() {
        return this.msgIdempotent;
    }

    static {
        log.debug("加载默认指令集", new Object[0]);
        registerDefaultCMDMessageHandler(DefaultCMD.FILE_HEADER, new AbstractMessageHandler() { // from class: com.kdgcsoft.sc.rdc.messenger.message.handler.FileHeaderHandler
            private static final Log log = LogFactory.get();

            @Override // com.kdgcsoft.sc.rdc.messenger.message.handler.AbstractMessageHandler, com.kdgcsoft.sc.rdc.messenger.message.ICmdMessageHandler
            public boolean onMessageReceived(CmdMessage cmdMessage) {
                CmdMessage fileMessage = ((FileHeaderBody) cmdMessage.getBody()).getFileMessage();
                try {
                    log.debug("{}收到文件头", new Object[]{cmdMessage.getBaseInfo()});
                    DBStoreServer.INST.saveReceiveMsg(fileMessage);
                    FileWriterHelper.INST.addFileWriter(fileMessage);
                    DBStoreServer.INST.updateReceiveMsgStatus(cmdMessage, MsgStatus.HEAD_SUCCESS);
                    log.debug("{}文件头指令本地准备完成,更新状态{}!", new Object[]{cmdMessage.getBaseInfo(), MsgStatus.HEAD_SUCCESS.toString()});
                    fileReady(fileMessage);
                    return true;
                } catch (Exception e) {
                    DBStoreServer.INST.updateReceiveMsgStatus(cmdMessage, MsgStatus.HEAD_ERROR);
                    log.error(e, "{}文件头接收失败,更新状态{}", new Object[]{cmdMessage.getBaseInfo(), MsgStatus.HEAD_ERROR.toString()});
                    return false;
                }
            }

            private synchronized void fileReady(CmdMessage cmdMessage) {
                CmdMessage cmdMessage2 = new CmdMessage(cmdMessage.buildBackMessageTrace(), RandomUtil.simpleUUID(), DefaultCMD.FILE_HEADER_READY.name(), false);
                cmdMessage2.setGroupMessageId(cmdMessage.getMessageId());
                cmdMessage2.setBody((FileBody) cmdMessage.getBody());
                Messenger.INST.sendMsg(cmdMessage2);
                log.debug("{}发送文件头接收完成指令!", new Object[]{cmdMessage.getBaseInfo()});
            }
        });
        registerDefaultCMDMessageHandler(DefaultCMD.FILE_HEADER_READY, new AbstractMessageHandler() { // from class: com.kdgcsoft.sc.rdc.messenger.message.handler.FileHeaderReadyHandler
            private static final Log log = LogFactory.get();

            @Override // com.kdgcsoft.sc.rdc.messenger.message.handler.AbstractMessageHandler, com.kdgcsoft.sc.rdc.messenger.message.ICmdMessageHandler
            public boolean onMessageReceived(CmdMessage cmdMessage) {
                log.debug("{}收到文件接收端准备完成指令,准备分块发送文件.", new Object[]{cmdMessage.getBaseInfo()});
                FileBody fileBody = (FileBody) cmdMessage.getBody();
                File file = new File(fileBody.getFileDiskPath());
                if (!file.exists()) {
                    log.error("{}文件不存在{},请检查!", new Object[]{cmdMessage.getBaseInfo(), file.getAbsolutePath()});
                    DBStoreServer.INST.updateSendMsgStatus(cmdMessage.getGroupMessageId(), MsgStatus.BLOCK_ERROR);
                    return false;
                }
                log.debug("{}对文件进行拆块发送{}!", new Object[]{cmdMessage.getBaseInfo(), file.getAbsolutePath()});
                RandomAccessFile randomAccessFile = null;
                try {
                    try {
                        long j = 0;
                        randomAccessFile = new RandomAccessFile(file, "r");
                        byte[] bArr = new byte[65536];
                        long j2 = 0;
                        while (true) {
                            int read = randomAccessFile.read(bArr);
                            if (read <= 0) {
                                break;
                            }
                            CmdMessage cmdMessage2 = new CmdMessage(cmdMessage.buildBackMessageTrace(), RandomUtil.simpleUUID(), DefaultCMD.FILE_BLOCK.name(), false);
                            cmdMessage2.setGroupMessageId(cmdMessage.getGroupMessageId());
                            FileBlockBody fileBlockBody = new FileBlockBody();
                            fileBlockBody.setFileId(fileBody.getFileId());
                            fileBlockBody.setFileTargetPath(fileBody.getFileTargetPath());
                            fileBlockBody.setBlockindex(Long.valueOf(j));
                            fileBlockBody.setStart(Long.valueOf(j2));
                            fileBlockBody.setLimit(read);
                            fileBlockBody.setFilelength(Long.valueOf(file.length()));
                            fileBlockBody.setContent(bArr);
                            cmdMessage2.setBody(fileBlockBody);
                            Messenger.INST.sendMsg(cmdMessage2);
                            j++;
                            j2 += 65536;
                            randomAccessFile.seek(j2);
                        }
                        log.debug("{}文件拆块发送完毕!", new Object[]{cmdMessage.getBaseInfo()});
                        if (randomAccessFile != null) {
                            try {
                                randomAccessFile.close();
                            } catch (IOException e) {
                                log.error(e, "RandomAccessFile关闭出错!", new Object[0]);
                            }
                        }
                        DBStoreServer.INST.updateSendMsgStatus(cmdMessage.getGroupMessageId(), MsgStatus.BLOCK_SUCCESS);
                        return true;
                    } catch (Throwable th) {
                        if (randomAccessFile != null) {
                            try {
                                randomAccessFile.close();
                            } catch (IOException e2) {
                                log.error(e2, "RandomAccessFile关闭出错!", new Object[0]);
                                throw th;
                            }
                        }
                        throw th;
                    }
                } catch (Exception e3) {
                    log.error(e3, "{}文件拆块发送失败!", new Object[]{cmdMessage.getBaseInfo()});
                    DBStoreServer.INST.updateSendMsgStatus(cmdMessage.getGroupMessageId(), MsgStatus.BLOCK_ERROR);
                    if (randomAccessFile != null) {
                        try {
                            randomAccessFile.close();
                        } catch (IOException e4) {
                            log.error(e4, "RandomAccessFile关闭出错!", new Object[0]);
                            return false;
                        }
                    }
                    return false;
                }
            }
        });
        registerDefaultCMDMessageHandler(DefaultCMD.FILE_BLOCK, new AbstractMessageHandler() { // from class: com.kdgcsoft.sc.rdc.messenger.message.handler.FileBlockHandler
            private static Logger log = LoggerFactory.getLogger(FileBlockHandler.class);

            @Override // com.kdgcsoft.sc.rdc.messenger.message.handler.AbstractMessageHandler, com.kdgcsoft.sc.rdc.messenger.message.ICmdMessageHandler
            public boolean onMessageReceived(CmdMessage cmdMessage) {
                FileBlockBody fileBlockBody = (FileBlockBody) cmdMessage.getBody();
                try {
                    FileWriter fileWriter = FileWriterHelper.INST.getFileWriter(cmdMessage.getGroupMessageId());
                    if (fileWriter == null) {
                        log.error("{} 块文件接收出错,块索引[{}],文件本地缓存文件未找到", cmdMessage.getBaseInfo(), fileBlockBody.getBlockindex());
                        return false;
                    }
                    CmdMessage filemessage = fileWriter.getFilemessage();
                    if (fileBlockBody.getBlockindex().equals(1L)) {
                        DBStoreServer.INST.updateReceiveMsgStatus(filemessage.getGroupMessageId(), MsgStatus.BLOCK_RECEIVEING);
                        log.info("{}文件接收开始", filemessage.getBaseInfo());
                    }
                    if (fileWriter.write(fileBlockBody)) {
                        completeFile(filemessage);
                    }
                    return true;
                } catch (Exception e) {
                    log.error("{}块文件接收出错,块索引[{}]", new Object[]{cmdMessage.getBaseInfo(), fileBlockBody.getBlockindex(), e});
                    DBStoreServer.INST.updateReceiveMsgStatus(cmdMessage.getGroupMessageId(), MsgStatus.BLOCK_ERROR);
                    return false;
                }
            }

            private synchronized void completeFile(CmdMessage cmdMessage) {
                boolean onMessageReceived;
                log.info("{}文件接收完成,进行后续处理", cmdMessage.getBaseInfo());
                FileBody fileBody = (FileBody) cmdMessage.getBody();
                String receiveBufferPath = DirHelper.INST.getReceiveBufferPath(cmdMessage.getMessageId());
                String receiveMatrixPath = DirHelper.INST.getReceiveMatrixPath(cmdMessage.getMessageId());
                String absolutePath = StrUtil.isNotEmpty(fileBody.getFileTargetPath()) ? FileUtil.getAbsolutePath(fileBody.getFileTargetPath()) : DirHelper.INST.getReceiveStoreFilePath(cmdMessage.getMessageId(), FileUtil.extName(fileBody.getFileDiskPath()));
                log.debug("{}文件传输完成,进行文件转存从{}到{}", new Object[]{cmdMessage.getBaseInfo(), receiveBufferPath, absolutePath});
                FileUtil.copy(receiveBufferPath, absolutePath, true);
                log.debug("{}文件转存完成,删除临时缓冲文件{},{}", new Object[]{cmdMessage.getBaseInfo(), receiveBufferPath, receiveMatrixPath});
                FileUtil.del(receiveBufferPath);
                FileUtil.del(receiveMatrixPath);
                fileBody.setReceivedStorePath(absolutePath);
                DBStoreServer.INST.saveReceiveMsg(cmdMessage);
                DBStoreServer.INST.updateReceiveMsgStatus(cmdMessage.getGroupMessageId(), MsgStatus.BLOCK_SUCCESS);
                if (FileWriterHelper.INST.getFileWriter(cmdMessage.getGroupMessageId()) != null) {
                    try {
                        AbstractMessageHandler messageHandler = Messenger.INST.getMessageHandler(cmdMessage.getCommand());
                        if (messageHandler == null) {
                            onMessageReceived = true;
                            log.error("{}未找到文件接收完成后续处理类,系统默认文件指令处理成功", cmdMessage.getBaseInfo());
                        } else {
                            log.debug("{}文件接收完成,交由{}.onMessageReceived处理", cmdMessage.getBaseInfo(), messageHandler.getClass().getSimpleName());
                            onMessageReceived = messageHandler.onMessageReceived(cmdMessage);
                            log.debug("{} {}.onMessageReceived 处理结果{}", new Object[]{cmdMessage.getBaseInfo(), messageHandler.getClass().getSimpleName(), Boolean.valueOf(onMessageReceived)});
                        }
                        if (onMessageReceived) {
                            log.debug("{}构建文件接收完成返回消息", cmdMessage.getBaseInfo());
                            CmdMessage cmdMessage2 = new CmdMessage(cmdMessage.buildBackMessageTrace(), RandomUtil.simpleUUID(), DefaultCMD.FILE_COMPLETE.name(), false);
                            cmdMessage2.setGroupMessageId(cmdMessage.getMessageId());
                            cmdMessage2.setBody(cmdMessage.getBody());
                            DBStoreServer.INST.updateReceiveMsgStatus(cmdMessage, MsgStatus.SUCCESS);
                            log.debug("{}更新文件消息状态:{}", cmdMessage.getBaseInfo(), MsgStatus.SUCCESS.toString());
                            Messenger.INST.sendMsg(cmdMessage2);
                            log.debug("{}发送文件接收完成消息", cmdMessage.getBaseInfo());
                            FileWriterHelper.INST.removeFileWriter(cmdMessage.getGroupMessageId());
                            log.debug("{}移除本地文件缓存文件操作类", cmdMessage.getBaseInfo());
                        } else {
                            DBStoreServer.INST.updateReceiveMsgStatus(cmdMessage, MsgStatus.RECEIVED_HANDLER_ERROR);
                            log.debug("{}更新文件消息状态:{},等待定时任务会进行再次处理", cmdMessage.getBaseInfo(), MsgStatus.RECEIVED_HANDLER_ERROR.toString());
                        }
                    } catch (Exception e) {
                        DBStoreServer.INST.updateReceiveMsgStatus(cmdMessage.getGroupMessageId(), MsgStatus.RECEIVED_HANDLER_ERROR);
                        log.error("{}文件接收完成后续处理出错", cmdMessage.getBaseInfo(), e);
                    }
                }
            }
        });
        registerDefaultCMDMessageHandler(DefaultCMD.FILE_COMPLETE, new AbstractMessageHandler() { // from class: com.kdgcsoft.sc.rdc.messenger.message.handler.FileCompleteHandler
            private static final Log log = LogFactory.get();

            @Override // com.kdgcsoft.sc.rdc.messenger.message.handler.AbstractMessageHandler, com.kdgcsoft.sc.rdc.messenger.message.ICmdMessageHandler
            public boolean onMessageReceived(CmdMessage cmdMessage) {
                log.debug("{}收到文件发送完成指令!", new Object[]{cmdMessage.getBaseInfo()});
                CmdMessage sendMsg = DBStoreServer.INST.getSendMsg(cmdMessage.getGroupMessageId());
                if (sendMsg == null) {
                    log.error("{}没有找到原始消息ID,无法重发!", new Object[]{cmdMessage.getBaseInfo()});
                    return false;
                }
                sendMsg.setBody(cmdMessage.getBody());
                DBStoreServer.INST.saveSendMsg(sendMsg);
                AbstractMessageHandler messageHandler = Messenger.INST.getMessageHandler(sendMsg.getCommand());
                if (messageHandler == null) {
                    DBStoreServer.INST.updateSendMsgStatus(sendMsg, MsgStatus.SUCCESS);
                    log.warn("{}没有找到文件接收完成处理类,更新状态{}", new Object[]{cmdMessage.getBaseInfo(), MsgStatus.SUCCESS.name()});
                    return true;
                }
                boolean onMessageSended = messageHandler.onMessageSended(sendMsg);
                log.debug("{}文件发送完成处理类{}.onMessageSended返回结果{}", new Object[]{cmdMessage.getBaseInfo(), messageHandler.getClass().getSimpleName(), Boolean.valueOf(onMessageSended)});
                if (onMessageSended) {
                    log.debug("{}更新状态{}", new Object[]{sendMsg.getBaseInfo(), MsgStatus.SUCCESS.name()});
                    DBStoreServer.INST.updateSendMsgStatus(sendMsg, MsgStatus.SUCCESS);
                } else {
                    log.debug("{}更新状态{}", new Object[]{sendMsg.getBaseInfo(), MsgStatus.SENDED_HANDLER_ERROR.name()});
                    DBStoreServer.INST.updateSendMsgStatus(sendMsg, MsgStatus.SENDED_HANDLER_ERROR);
                }
                return onMessageSended;
            }
        });
        registerDefaultCMDMessageHandler(DefaultCMD.FILE_RESEND, new AbstractMessageHandler() { // from class: com.kdgcsoft.sc.rdc.messenger.message.handler.FileResendHandler
            private static final Log log = LogFactory.get();

            @Override // com.kdgcsoft.sc.rdc.messenger.message.handler.AbstractMessageHandler, com.kdgcsoft.sc.rdc.messenger.message.ICmdMessageHandler
            public boolean onMessageReceived(CmdMessage cmdMessage) {
                log.debug("{}收到文件重发指令消息!", new Object[]{cmdMessage.getBaseInfo()});
                try {
                    CmdMessage sendMsg = DBStoreServer.INST.getSendMsg(cmdMessage.getGroupMessageId());
                    if (sendMsg == null) {
                        log.error("{}没有找到原始消息ID,无法重发!", new Object[]{cmdMessage.getBaseInfo()});
                        return false;
                    }
                    Messenger.INST.sendMsg(sendMsg);
                    return true;
                } catch (Exception e) {
                    log.error(e, "{}文件重发失败", new Object[]{cmdMessage.getBaseInfo()});
                    return false;
                }
            }
        });
        registerDefaultCMDMessageHandler(DefaultCMD.FILE_BLOCK_RESEND, new AbstractMessageHandler() { // from class: com.kdgcsoft.sc.rdc.messenger.message.handler.FileBlockResendHandler
            private static final Log log = LogFactory.get();

            @Override // com.kdgcsoft.sc.rdc.messenger.message.handler.AbstractMessageHandler, com.kdgcsoft.sc.rdc.messenger.message.ICmdMessageHandler
            public boolean onMessageReceived(CmdMessage cmdMessage) {
                log.debug("{}收到文件块重发指令!", new Object[]{cmdMessage.getBaseInfo()});
                FileBlockResendBody fileBlockResendBody = (FileBlockResendBody) cmdMessage.getBody();
                CmdMessage sendMsg = DBStoreServer.INST.getSendMsg(cmdMessage.getGroupMessageId());
                if (sendMsg == null) {
                    log.error("{}没有找到原始消息ID,无法重发!", new Object[]{cmdMessage.getBaseInfo()});
                    return false;
                }
                FileBody fileBody = (FileBody) sendMsg.getBody();
                File file = new File(fileBody.getFileDiskPath());
                log.debug("{}根据矩阵信息重发文件块{}", new Object[]{cmdMessage.getBaseInfo(), file.getAbsolutePath()});
                try {
                    RandomAccessFile randomAccessFile = new RandomAccessFile(fileBody.getFileDiskPath(), "rw");
                    Throwable th = null;
                    try {
                        try {
                            byte[] bArr = new byte[65536];
                            byte[] matrix = fileBlockResendBody.getMatrix();
                            for (int i = 0; i < matrix.length; i++) {
                                if (matrix[i] == 0) {
                                    long j = i * 65536;
                                    randomAccessFile.seek(j);
                                    int read = randomAccessFile.read(bArr);
                                    CmdMessage cmdMessage2 = new CmdMessage(cmdMessage.buildBackMessageTrace(), RandomUtil.simpleUUID(), DefaultCMD.FILE_BLOCK.name(), false);
                                    cmdMessage2.setGroupMessageId(cmdMessage.getGroupMessageId());
                                    FileBlockBody fileBlockBody = new FileBlockBody();
                                    fileBlockBody.setFileId(fileBody.getFileId());
                                    fileBlockBody.setFileTargetPath(fileBody.getFileTargetPath());
                                    fileBlockBody.setBlockindex(Long.valueOf(i));
                                    fileBlockBody.setStart(Long.valueOf(j));
                                    fileBlockBody.setLimit(read);
                                    fileBlockBody.setFilelength(Long.valueOf(file.length()));
                                    fileBlockBody.setContent(bArr);
                                    cmdMessage2.setBody(fileBlockBody);
                                    Messenger.INST.sendMsg(cmdMessage2);
                                }
                            }
                            log.debug("{}矩阵信息中缺失的文件块已经重发", new Object[]{cmdMessage.getBaseInfo()});
                            if (randomAccessFile != null) {
                                if (0 != 0) {
                                    try {
                                        randomAccessFile.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                } else {
                                    randomAccessFile.close();
                                }
                            }
                            return true;
                        } finally {
                        }
                    } catch (Throwable th3) {
                        if (randomAccessFile != null) {
                            if (th != null) {
                                try {
                                    randomAccessFile.close();
                                } catch (Throwable th4) {
                                    th.addSuppressed(th4);
                                }
                            } else {
                                randomAccessFile.close();
                            }
                        }
                        throw th3;
                    }
                } catch (FileNotFoundException e) {
                    log.error(e, "{}文件块重发失败", new Object[]{cmdMessage.getBaseInfo()});
                    return false;
                } catch (IOException e2) {
                    log.error(e2, "{}文件块重发失败", new Object[]{cmdMessage.getBaseInfo()});
                    return false;
                }
            }
        });
        registerDefaultCMDMessageHandler(DefaultCMD.FILE_FAILED, new AbstractMessageHandler() { // from class: com.kdgcsoft.sc.rdc.messenger.message.handler.FileFailedHandler
            private static final Log log = LogFactory.get();

            @Override // com.kdgcsoft.sc.rdc.messenger.message.handler.AbstractMessageHandler, com.kdgcsoft.sc.rdc.messenger.message.ICmdMessageHandler
            public boolean onMessageReceived(CmdMessage cmdMessage) {
                log.debug("{}收到消息经重试后失败的消息,调用发送方的onMessageRetryFailed方法!", new Object[]{cmdMessage.getBaseInfo()});
                CmdMessage cmdMessage2 = (CmdMessage) cmdMessage.getBody();
                AbstractMessageHandler messageHandler = Messenger.INST.getMessageHandler(cmdMessage2.getCommand());
                if (messageHandler == null) {
                    log.error("{}未找到失败处理类", new Object[]{cmdMessage2.getBaseInfo()});
                    return true;
                }
                log.error("{}交由{}.onMessageRetryFailed处理", new Object[]{cmdMessage2.getBaseInfo(), messageHandler.getClass().getSimpleName()});
                messageHandler.onMessageRetryFailed(cmdMessage);
                return true;
            }
        });
        registerDefaultCMDMessageHandler(DefaultCMD.MSG_REPLAY, new AbstractMessageHandler() { // from class: com.kdgcsoft.sc.rdc.messenger.message.handler.MsgReplyHandler
            private static final Log log = LogFactory.get();

            @Override // com.kdgcsoft.sc.rdc.messenger.message.handler.AbstractMessageHandler, com.kdgcsoft.sc.rdc.messenger.message.ICmdMessageHandler
            public boolean onMessageReceived(CmdMessage cmdMessage) {
                if (MessageType.FILE != Messenger.INST.getMessageType(cmdMessage.getCommand()) || !cmdMessage.isStore()) {
                    return true;
                }
                CmdMessage sendMsg = DBStoreServer.INST.getSendMsg(cmdMessage.getGroupMessageId());
                if (sendMsg == null) {
                    log.warn("{}没有找到原始消息ID!默认消息消费成功,", new Object[]{cmdMessage.getBaseInfo()});
                    return true;
                }
                AbstractMessageHandler messageHandler = Messenger.INST.getMessageHandler(sendMsg.getCommand());
                if (messageHandler == null) {
                    log.warn("{}没有找到消息完成处理类,默认处理结果为MsgStatus.SUCCESS", new Object[]{cmdMessage.getBaseInfo()});
                    DBStoreServer.INST.updateSendMsgStatus(sendMsg, MsgStatus.SUCCESS);
                    return true;
                }
                boolean onMessageSended = messageHandler.onMessageSended(sendMsg);
                log.debug("{}收到消息消费成功指令{}.onMessageSended 返回结果{}", new Object[]{cmdMessage.getBaseInfo(), messageHandler.getClass().getSimpleName(), Boolean.valueOf(onMessageSended)});
                if (onMessageSended) {
                    DBStoreServer.INST.updateSendMsgStatus(sendMsg, MsgStatus.SUCCESS);
                } else {
                    DBStoreServer.INST.updateSendMsgStatus(sendMsg, MsgStatus.SENDED_HANDLER_ERROR);
                }
                return onMessageSended;
            }
        });
        log.debug("指令集加载完成", new Object[0]);
    }
}
