package org.jetlinks.core.message.interceptor;

import com.github.benmanes.caffeine.cache.Caffeine;
import java.time.Duration;
import java.util.Map;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.utils.SerialFlux;
import reactor.core.publisher.Flux;
import reactor.util.function.Tuples;

/* loaded from: input_file:org/jetlinks/core/message/interceptor/SerialDeviceMessageSenderInterceptor.class */
public class SerialDeviceMessageSenderInterceptor implements DeviceMessageSenderInterceptor {
    public static final SerialDeviceMessageSenderInterceptor GLOBAL = new SerialDeviceMessageSenderInterceptor();
    final Map<Object, SerialFlux<DeviceMessage>> pending = Caffeine.newBuilder().expireAfterAccess(Duration.ofHours(1)).build().asMap();

    protected boolean needSerial(DeviceMessage deviceMessage) {
        return true;
    }

    protected Object getSerialKey(DeviceMessage deviceMessage) {
        return Tuples.of(deviceMessage.getDeviceId(), deviceMessage.getMessageType());
    }

    @Override // org.jetlinks.core.message.interceptor.DeviceMessageSenderInterceptor
    public Flux<DeviceMessage> doSend(DeviceOperator deviceOperator, DeviceMessage deviceMessage, Flux<DeviceMessage> flux) {
        if (!needSerial(deviceMessage)) {
            return flux;
        }
        return this.pending.computeIfAbsent(getSerialKey(deviceMessage), obj -> {
            return new SerialFlux();
        }).join(flux);
    }

    @Override // org.jetlinks.core.message.interceptor.DeviceMessageSenderInterceptor
    public int getOrder() {
        return Integer.MIN_VALUE;
    }
}
