package com.kdgcsoft.dtp.plugin.extend.read.rabbitmq;

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson2.JSON;
import com.kdgcsoft.dtp.plugin.extend.read.rabbitmq.factory.ConnectionFactoryBuilder;
import com.kdgcsoft.dtp.plugin.extend.read.rabbitmq.factory.SimpleMessageListenerContainerFactory;
import com.kdgcsoft.dtp.plugin.extend.read.rabbitmq.properties.RabbitmqProperties;
import com.kdgcsoft.dtp.plugin.writer.databaseWriter.service.IBlockStreamReader;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer;
import org.springframework.util.StringUtils;

/* loaded from: input_file:com/kdgcsoft/dtp/plugin/extend/read/rabbitmq/RabbitmqStreamReader.class */
public class RabbitmqStreamReader implements MessageListener, IBlockStreamReader {
    private final AbstractMessageListenerContainer container;
    private final RabbitmqProperties properties;
    private CachingConnectionFactory connectionFactory;
    private final Logger logger = LoggerFactory.getLogger(getClass());
    private final BlockingDeque<Map<String, String>> caches = new LinkedBlockingDeque(100000);
    private volatile String[] headers;

    public RabbitmqStreamReader(RabbitmqProperties rabbitmqProperties) {
        this.properties = rabbitmqProperties;
        SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory = new SimpleMessageListenerContainerFactory(this, rabbitmqProperties.getQueueName());
        this.connectionFactory = ConnectionFactoryBuilder.builder(rabbitmqProperties);
        this.container = simpleMessageListenerContainerFactory.createMessageListenerContainer(this.connectionFactory);
    }

    public void start() {
        this.logger.info("消息监听器已经启动，监听{}队列信息", this.properties.getQueueName());
        this.container.start();
    }

    public boolean hasMessage() {
        return !this.caches.isEmpty();
    }

    public void onMessage(Message message) {
        String str = new String(message.getBody(), Charset.defaultCharset());
        if (StringUtils.hasText(str)) {
            try {
                if (JSON.isValidObject(str)) {
                    Map<String, String> jsonObjectToMap = jsonObjectToMap(JSONObject.parseObject(str));
                    setHeaders(jsonObjectToMap);
                    this.caches.put(jsonObjectToMap);
                } else if (JSON.isValidArray(str)) {
                    JSONArray parseArray = JSONArray.parseArray(str);
                    for (int i = 0; i < parseArray.size(); i++) {
                        Map<String, String> jsonObjectToMap2 = jsonObjectToMap(parseArray.getJSONObject(i));
                        setHeaders(jsonObjectToMap2);
                        this.caches.put(jsonObjectToMap2);
                    }
                } else {
                    this.logger.debug("rabbitmq 消息格式不合法，已经自动忽略 ：{}", str);
                }
            } catch (Exception e) {
                this.logger.error("消息解析失败，不是标准的json数据：\n" + str);
            }
        }
    }

    private void setHeaders(Map<String, String> map) {
        if (this.headers == null) {
            synchronized (RabbitmqStreamReader.class) {
                if (this.headers == null) {
                    this.headers = (String[]) map.keySet().toArray(new String[0]);
                }
            }
        }
    }

    private Map<String, String> jsonObjectToMap(JSONObject jSONObject) {
        HashMap hashMap = new HashMap();
        for (String str : jSONObject.keySet()) {
            hashMap.put(str, jSONObject.getString(str));
        }
        return hashMap;
    }

    public String[] getHead() {
        return this.headers;
    }

    public String getValue(String[] strArr, String str) {
        return null;
    }

    /* renamed from: next, reason: merged with bridge method [inline-methods] */
    public String[] m1next() {
        try {
            Map<String, String> poll = this.caches.poll(1L, TimeUnit.SECONDS);
            if (poll != null) {
                return (String[]) poll.values().toArray(new String[0]);
            }
            return null;
        } catch (InterruptedException e) {
            this.logger.error("获取阻塞队列中数据异常，异常信息为：" + e.getLocalizedMessage());
            return null;
        }
    }

    public boolean hasNext() {
        return !this.caches.isEmpty();
    }

    public void close() {
        this.container.destroy();
        this.connectionFactory.destroy();
    }
}
