package de.xab.porter.transfer.channel;

import de.xab.porter.api.Result;
import de.xab.porter.api.exception.PorterException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Consumer;

/* loaded from: input_file:de/xab/porter/transfer/channel/DefaultChannel.class */
public class DefaultChannel implements Channel {
    private final BlockingQueue<Result<?>> resultQueue = new LinkedBlockingQueue(2);
    private Consumer<Result<?>> onReadListener;

    @Override // de.xab.porter.transfer.channel.Channel
    public void push(Result<?> result) {
        try {
            this.resultQueue.put(result);
            notifyWriter();
        } catch (InterruptedException e) {
            throw new PorterException("push data to channel failed", e);
        }
    }

    @Override // de.xab.porter.transfer.channel.Channel
    public Result<?> pop() {
        try {
            return this.resultQueue.take();
        } catch (InterruptedException e) {
            throw new PorterException("consume data from channel failed", e);
        }
    }

    @Override // de.xab.porter.transfer.channel.Channel
    public void notifyWriter() {
        this.onReadListener.accept(pop());
    }

    @Override // de.xab.porter.transfer.channel.Channel
    public void setOnReadListener(Consumer<Result<?>> consumer) {
        this.onReadListener = consumer;
    }
}
