package com.alibaba.otter.canal.server.embedded;

import com.alibaba.otter.canal.common.AbstractCanalLifeCycle;
import com.alibaba.otter.canal.instance.core.CanalInstance;
import com.alibaba.otter.canal.instance.core.CanalInstanceGenerator;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.ClientIdentity;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.position.Position;
import com.alibaba.otter.canal.protocol.position.PositionRange;
import com.alibaba.otter.canal.server.CanalServer;
import com.alibaba.otter.canal.server.CanalService;
import com.alibaba.otter.canal.server.exception.CanalServerException;
import com.alibaba.otter.canal.spi.CanalMetricsProvider;
import com.alibaba.otter.canal.spi.CanalMetricsService;
import com.alibaba.otter.canal.spi.NopCanalMetricsService;
import com.alibaba.otter.canal.store.CanalEventStore;
import com.alibaba.otter.canal.store.memory.MemoryEventStoreWithBuffer;
import com.alibaba.otter.canal.store.model.Event;
import com.alibaba.otter.canal.store.model.Events;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.MigrateMap;
import com.google.protobuf.ByteString;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.util.CollectionUtils;

/* loaded from: input_file:com/alibaba/otter/canal/server/embedded/CanalServerWithEmbedded.class */
public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements CanalServer, CanalService {
    private static final Logger logger = LoggerFactory.getLogger(CanalServerWithEmbedded.class);
    private Map<String, CanalInstance> canalInstances;
    private CanalInstanceGenerator canalInstanceGenerator;
    private int metricsPort;
    private CanalMetricsService metrics = NopCanalMetricsService.NOP;

    /* loaded from: input_file:com/alibaba/otter/canal/server/embedded/CanalServerWithEmbedded$SingletonHolder.class */
    private static class SingletonHolder {
        private static final CanalServerWithEmbedded CANAL_SERVER_WITH_EMBEDDED = new CanalServerWithEmbedded();

        private SingletonHolder() {
        }
    }

    public static CanalServerWithEmbedded instance() {
        return SingletonHolder.CANAL_SERVER_WITH_EMBEDDED;
    }

    @Override // com.alibaba.otter.canal.server.CanalServer
    public void start() {
        if (isStart()) {
            return;
        }
        super.start();
        loadCanalMetrics();
        this.metrics.setServerPort(this.metricsPort);
        this.metrics.initialize();
        this.canalInstances = MigrateMap.makeComputingMap(new Function<String, CanalInstance>() { // from class: com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded.1
            public CanalInstance apply(String str) {
                return CanalServerWithEmbedded.this.canalInstanceGenerator.generate(str);
            }
        });
    }

    @Override // com.alibaba.otter.canal.server.CanalServer
    public void stop() {
        super.stop();
        for (Map.Entry<String, CanalInstance> entry : this.canalInstances.entrySet()) {
            try {
                if (entry.getValue().isStart()) {
                    try {
                        String key = entry.getKey();
                        MDC.put("destination", key);
                        entry.getValue().stop();
                        logger.info("stop CanalInstances[{}] successfully", key);
                        MDC.remove("destination");
                    } catch (Throwable th) {
                        MDC.remove("destination");
                        throw th;
                        break;
                    }
                }
            } catch (Exception e) {
                logger.error(String.format("stop CanalInstance[%s] has an error", entry.getKey()), e);
            }
        }
        this.metrics.terminate();
    }

    public void start(String str) {
        CanalInstance canalInstance = this.canalInstances.get(str);
        if (canalInstance.isStart()) {
            return;
        }
        try {
            MDC.put("destination", str);
            if (this.metrics.isRunning()) {
                this.metrics.register(canalInstance);
            }
            canalInstance.start();
            logger.info("start CanalInstances[{}] successfully", str);
            MDC.remove("destination");
        } catch (Throwable th) {
            MDC.remove("destination");
            throw th;
        }
    }

    public void stop(String str) {
        CanalInstance remove = this.canalInstances.remove(str);
        if (remove == null || !remove.isStart()) {
            return;
        }
        try {
            MDC.put("destination", str);
            remove.stop();
            if (this.metrics.isRunning()) {
                this.metrics.unregister(remove);
            }
            logger.info("stop CanalInstances[{}] successfully", str);
            MDC.remove("destination");
        } catch (Throwable th) {
            MDC.remove("destination");
            throw th;
        }
    }

    public boolean isStart(String str) {
        return this.canalInstances.containsKey(str) && this.canalInstances.get(str).isStart();
    }

    @Override // com.alibaba.otter.canal.server.CanalService
    public void subscribe(ClientIdentity clientIdentity) throws CanalServerException {
        checkStart(clientIdentity.getDestination());
        CanalInstance canalInstance = this.canalInstances.get(clientIdentity.getDestination());
        if (!canalInstance.getMetaManager().isStart()) {
            canalInstance.getMetaManager().start();
        }
        canalInstance.getMetaManager().subscribe(clientIdentity);
        Position cursor = canalInstance.getMetaManager().getCursor(clientIdentity);
        if (cursor == null) {
            Position firstPosition = canalInstance.getEventStore().getFirstPosition();
            if (firstPosition != null) {
                canalInstance.getMetaManager().updateCursor(clientIdentity, firstPosition);
            }
            logger.info("subscribe successfully, {} with first position:{} ", clientIdentity, firstPosition);
        } else {
            logger.info("subscribe successfully, use last cursor position:{} ", clientIdentity, cursor);
        }
        canalInstance.subscribeChange(clientIdentity);
    }

    @Override // com.alibaba.otter.canal.server.CanalService
    public void unsubscribe(ClientIdentity clientIdentity) throws CanalServerException {
        this.canalInstances.get(clientIdentity.getDestination()).getMetaManager().unsubscribe(clientIdentity);
        logger.info("unsubscribe successfully, {}", clientIdentity);
    }

    public List<ClientIdentity> listAllSubscribe(String str) throws CanalServerException {
        return this.canalInstances.get(str).getMetaManager().listAllSubscribeInfo(str);
    }

    @Override // com.alibaba.otter.canal.server.CanalService
    public Message get(ClientIdentity clientIdentity, int i) throws CanalServerException {
        return get(clientIdentity, i, null, null);
    }

    @Override // com.alibaba.otter.canal.server.CanalService
    public Message get(ClientIdentity clientIdentity, int i, Long l, TimeUnit timeUnit) throws CanalServerException {
        checkStart(clientIdentity.getDestination());
        checkSubscribe(clientIdentity);
        CanalInstance canalInstance = this.canalInstances.get(clientIdentity.getDestination());
        synchronized (canalInstance) {
            PositionRange lastestBatch = canalInstance.getMetaManager().getLastestBatch(clientIdentity);
            if (lastestBatch != null) {
                throw new CanalServerException(String.format("clientId:%s has last batch:[%s] isn't ack , maybe loss data", Short.valueOf(clientIdentity.getClientId()), lastestBatch));
            }
            Events<Event> events = getEvents(canalInstance.getEventStore(), canalInstance.getMetaManager().getCursor(clientIdentity), i, l, timeUnit);
            if (CollectionUtils.isEmpty(events.getEvents())) {
                logger.debug("get successfully, clientId:{} batchSize:{} but result is null", Short.valueOf(clientIdentity.getClientId()), Integer.valueOf(i));
                return new Message(-1L, true, new ArrayList());
            }
            Long addBatch = canalInstance.getMetaManager().addBatch(clientIdentity, events.getPositionRange());
            boolean isRaw = isRaw(canalInstance.getEventStore());
            List transform = isRaw ? Lists.transform(events.getEvents(), new Function<Event, ByteString>() { // from class: com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded.2
                public ByteString apply(Event event) {
                    return event.getRawEntry();
                }
            }) : Lists.transform(events.getEvents(), new Function<Event, CanalEntry.Entry>() { // from class: com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded.3
                public CanalEntry.Entry apply(Event event) {
                    return event.getEntry();
                }
            });
            if (logger.isInfoEnabled()) {
                logger.info("get successfully, clientId:{} batchSize:{} real size is {} and result is [batchId:{} , position:{}]", new Object[]{Short.valueOf(clientIdentity.getClientId()), Integer.valueOf(i), Integer.valueOf(transform.size()), addBatch, events.getPositionRange()});
            }
            ack(clientIdentity, addBatch.longValue());
            return new Message(addBatch.longValue(), isRaw, transform);
        }
    }

    @Override // com.alibaba.otter.canal.server.CanalService
    public Message getWithoutAck(ClientIdentity clientIdentity, int i) throws CanalServerException {
        return getWithoutAck(clientIdentity, i, null, null);
    }

    @Override // com.alibaba.otter.canal.server.CanalService
    public Message getWithoutAck(ClientIdentity clientIdentity, int i, Long l, TimeUnit timeUnit) throws CanalServerException {
        Events<Event> events;
        checkStart(clientIdentity.getDestination());
        checkSubscribe(clientIdentity);
        CanalInstance canalInstance = this.canalInstances.get(clientIdentity.getDestination());
        synchronized (canalInstance) {
            PositionRange lastestBatch = canalInstance.getMetaManager().getLastestBatch(clientIdentity);
            if (lastestBatch != null) {
                events = getEvents(canalInstance.getEventStore(), lastestBatch.getStart(), i, l, timeUnit);
            } else {
                Position cursor = canalInstance.getMetaManager().getCursor(clientIdentity);
                if (cursor == null) {
                    cursor = canalInstance.getEventStore().getFirstPosition();
                }
                events = getEvents(canalInstance.getEventStore(), cursor, i, l, timeUnit);
            }
            if (CollectionUtils.isEmpty(events.getEvents())) {
                return new Message(-1L, true, new ArrayList());
            }
            Long addBatch = canalInstance.getMetaManager().addBatch(clientIdentity, events.getPositionRange());
            boolean isRaw = isRaw(canalInstance.getEventStore());
            List transform = isRaw ? Lists.transform(events.getEvents(), new Function<Event, ByteString>() { // from class: com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded.4
                public ByteString apply(Event event) {
                    return event.getRawEntry();
                }
            }) : Lists.transform(events.getEvents(), new Function<Event, CanalEntry.Entry>() { // from class: com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded.5
                public CanalEntry.Entry apply(Event event) {
                    return event.getEntry();
                }
            });
            if (logger.isInfoEnabled()) {
                logger.info("getWithoutAck successfully, clientId:{} batchSize:{}  real size is {} and result is [batchId:{} , position:{}]", new Object[]{Short.valueOf(clientIdentity.getClientId()), Integer.valueOf(i), Integer.valueOf(transform.size()), addBatch, events.getPositionRange()});
            }
            return new Message(addBatch.longValue(), isRaw, transform);
        }
    }

    public List<Long> listBatchIds(ClientIdentity clientIdentity) throws CanalServerException {
        checkStart(clientIdentity.getDestination());
        checkSubscribe(clientIdentity);
        ArrayList arrayList = new ArrayList(this.canalInstances.get(clientIdentity.getDestination()).getMetaManager().listAllBatchs(clientIdentity).keySet());
        Collections.sort(arrayList);
        return arrayList;
    }

    @Override // com.alibaba.otter.canal.server.CanalService
    public void ack(ClientIdentity clientIdentity, long j) throws CanalServerException {
        checkStart(clientIdentity.getDestination());
        checkSubscribe(clientIdentity);
        CanalInstance canalInstance = this.canalInstances.get(clientIdentity.getDestination());
        PositionRange removeBatch = canalInstance.getMetaManager().removeBatch(clientIdentity, Long.valueOf(j));
        if (removeBatch == null) {
            throw new CanalServerException(String.format("ack error , clientId:%s batchId:%d is not exist , please check", Short.valueOf(clientIdentity.getClientId()), Long.valueOf(j)));
        }
        if (removeBatch.getAck() != null) {
            canalInstance.getMetaManager().updateCursor(clientIdentity, removeBatch.getAck());
            if (logger.isInfoEnabled()) {
                logger.info("ack successfully, clientId:{} batchId:{} position:{}", new Object[]{Short.valueOf(clientIdentity.getClientId()), Long.valueOf(j), removeBatch});
            }
        }
        canalInstance.getEventStore().ack(removeBatch.getEnd());
    }

    @Override // com.alibaba.otter.canal.server.CanalService
    public void rollback(ClientIdentity clientIdentity) throws CanalServerException {
        checkStart(clientIdentity.getDestination());
        CanalInstance canalInstance = this.canalInstances.get(clientIdentity.getDestination());
        if (canalInstance.getMetaManager().hasSubscribe(clientIdentity)) {
            synchronized (canalInstance) {
                canalInstance.getMetaManager().clearAllBatchs(clientIdentity);
                canalInstance.getEventStore().rollback();
                logger.info("rollback successfully, clientId:{}", new Object[]{Short.valueOf(clientIdentity.getClientId())});
            }
        }
    }

    @Override // com.alibaba.otter.canal.server.CanalService
    public void rollback(ClientIdentity clientIdentity, Long l) throws CanalServerException {
        checkStart(clientIdentity.getDestination());
        CanalInstance canalInstance = this.canalInstances.get(clientIdentity.getDestination());
        if (canalInstance.getMetaManager().hasSubscribe(clientIdentity)) {
            synchronized (canalInstance) {
                PositionRange removeBatch = canalInstance.getMetaManager().removeBatch(clientIdentity, l);
                if (removeBatch == null) {
                    throw new CanalServerException(String.format("rollback error, clientId:%s batchId:%d is not exist , please check", Short.valueOf(clientIdentity.getClientId()), l));
                }
                canalInstance.getEventStore().rollback();
                logger.info("rollback successfully, clientId:{} batchId:{} position:{}", new Object[]{Short.valueOf(clientIdentity.getClientId()), l, removeBatch});
            }
        }
    }

    public Map<String, CanalInstance> getCanalInstances() {
        return Maps.newHashMap(this.canalInstances);
    }

    private Events<Event> getEvents(CanalEventStore canalEventStore, Position position, int i, Long l, TimeUnit timeUnit) {
        if (l == null) {
            return canalEventStore.tryGet(position, i);
        }
        try {
            return l.longValue() <= 0 ? canalEventStore.get(position, i) : canalEventStore.get(position, i, l.longValue(), timeUnit);
        } catch (Exception e) {
            throw new CanalServerException(e);
        }
    }

    private void checkSubscribe(ClientIdentity clientIdentity) {
        if (!this.canalInstances.get(clientIdentity.getDestination()).getMetaManager().hasSubscribe(clientIdentity)) {
            throw new CanalServerException(String.format("ClientIdentity:%s should subscribe first", clientIdentity.toString()));
        }
    }

    private void checkStart(String str) {
        if (!isStart(str)) {
            throw new CanalServerException(String.format("destination:%s should start first", str));
        }
    }

    private void loadCanalMetrics() {
        ServiceLoader load = ServiceLoader.load(CanalMetricsProvider.class);
        ArrayList arrayList = new ArrayList();
        Iterator it = load.iterator();
        while (it.hasNext()) {
            arrayList.add((CanalMetricsProvider) it.next());
        }
        if (arrayList.isEmpty()) {
            return;
        }
        if (arrayList.size() > 1) {
            logger.warn("Found more than one CanalMetricsProvider, use the first one.");
            Iterator it2 = arrayList.iterator();
            while (it2.hasNext()) {
                logger.warn("Found CanalMetricsProvider: {}.", ((CanalMetricsProvider) it2.next()).getClass().getName());
            }
        }
        this.metrics = ((CanalMetricsProvider) arrayList.get(0)).getService();
    }

    private boolean isRaw(CanalEventStore canalEventStore) {
        if (canalEventStore instanceof MemoryEventStoreWithBuffer) {
            return ((MemoryEventStoreWithBuffer) canalEventStore).isRaw();
        }
        return true;
    }

    public void setCanalInstanceGenerator(CanalInstanceGenerator canalInstanceGenerator) {
        this.canalInstanceGenerator = canalInstanceGenerator;
    }

    public void setMetricsPort(int i) {
        this.metricsPort = i;
    }
}
