package org.eclipse.californium.core.test;

import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.californium.TestTools;
import org.eclipse.californium.core.CoapClient;
import org.eclipse.californium.core.CoapObserveRelation;
import org.eclipse.californium.core.CoapResource;
import org.eclipse.californium.core.CoapResponse;
import org.eclipse.californium.core.CoapServer;
import org.eclipse.californium.core.coap.CoAP;
import org.eclipse.californium.core.coap.EmptyMessage;
import org.eclipse.californium.core.coap.Request;
import org.eclipse.californium.core.coap.Response;
import org.eclipse.californium.core.coap.Token;
import org.eclipse.californium.core.config.CoapConfig;
import org.eclipse.californium.core.network.CoapEndpoint;
import org.eclipse.californium.core.network.Endpoint;
import org.eclipse.californium.core.network.EndpointManager;
import org.eclipse.californium.core.network.interceptors.MessageInterceptor;
import org.eclipse.californium.core.network.interceptors.MessageInterceptorAdapter;
import org.eclipse.californium.core.network.interceptors.MessageTracer;
import org.eclipse.californium.core.observe.Observation;
import org.eclipse.californium.core.observe.ObservationStore;
import org.eclipse.californium.core.observe.ObservationStoreException;
import org.eclipse.californium.core.observe.ObservationUtil;
import org.eclipse.californium.core.server.resources.CoapExchange;
import org.eclipse.californium.core.server.resources.Resource;
import org.eclipse.californium.elements.EndpointContext;
import org.eclipse.californium.elements.category.Medium;
import org.eclipse.californium.elements.config.Configuration;
import org.eclipse.californium.elements.rule.NetworkRule;
import org.eclipse.californium.elements.rule.TestNameLoggerRule;
import org.eclipse.californium.elements.util.ExecutorsUtil;
import org.eclipse.californium.elements.util.NamedThreadFactory;
import org.eclipse.californium.rule.CoapNetworkRule;
import org.eclipse.californium.rule.CoapThreadsRule;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category({Medium.class})
/* loaded from: input_file:org/eclipse/californium/core/test/ObserveTest.class */
public class ObserveTest {
    static final String TARGET_X = "resX";
    static final String TARGET_Y = "resY";
    static final String RESPONSE = "hi";
    private CoapEndpoint serverEndpoint;
    private MyResource resourceX;
    private MyResource resourceY;
    private MyObservationStore observations;
    private String uriX;
    private String uriY;
    private static final Logger LOGGER = LoggerFactory.getLogger(ObserveTest.class);

    @ClassRule
    public static CoapNetworkRule network = new CoapNetworkRule(NetworkRule.Mode.DIRECT, NetworkRule.Mode.NATIVE);
    static final AtomicInteger counter = new AtomicInteger();

    @Rule
    public CoapThreadsRule cleanup = new CoapThreadsRule();

    @Rule
    public TestNameLoggerRule name = new TestNameLoggerRule();
    private final CountDownLatch waitforit = new CountDownLatch(1);

    /* loaded from: input_file:org/eclipse/californium/core/test/ObserveTest$ClientMessageInterceptor.class */
    private class ClientMessageInterceptor extends MessageInterceptorAdapter {
        private int counter;

        private ClientMessageInterceptor() {
            this.counter = 0;
        }

        public void receiveResponse(Response response) {
            this.counter++;
            switch (this.counter) {
                case 1:
                case 2:
                    return;
                case 3:
                case 4:
                    lose(response);
                    return;
                case 5:
                    lose(response);
                    ObserveTest.this.resourceX.changed();
                    return;
                case 6:
                    lose(response);
                    return;
                case 7:
                    lose(response);
                    ObserveTest.this.waitforit.countDown();
                    return;
                default:
                    throw new IllegalStateException("Should not receive " + this.counter + " responses");
            }
        }

        private void lose(Response response) {
            ObserveTest.LOGGER.info("Lose response {} with MID {}, payload = {}", new Object[]{Integer.valueOf(this.counter), Integer.valueOf(response.getMID()), response.getPayloadString()});
            response.cancel();
        }
    }

    /* loaded from: input_file:org/eclipse/californium/core/test/ObserveTest$MessageObserverCounterMessageInterceptor.class */
    private class MessageObserverCounterMessageInterceptor extends MessageInterceptorAdapter {
        private int messageObserverCounter;

        private MessageObserverCounterMessageInterceptor() {
            this.messageObserverCounter = 0;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public synchronized int getMessageObserverCounter() {
            return this.messageObserverCounter;
        }

        public synchronized void sendRequest(Request request) {
            this.messageObserverCounter = request.getMessageObservers().size();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/eclipse/californium/core/test/ObserveTest$MyObservationStore.class */
    public static class MyObservationStore implements ObservationStore {
        private final ConcurrentMap<Token, Observation> map = new ConcurrentHashMap();
        private volatile AtomicReference<ObservationStoreException> exception = new AtomicReference<>();

        public void setStoreException(ObservationStoreException observationStoreException) {
            this.exception.set(observationStoreException);
        }

        public void setExecutor(ScheduledExecutorService scheduledExecutorService) {
        }

        public Observation putIfAbsent(Token token, Observation observation) {
            if (token == null) {
                throw new NullPointerException("token must not be null");
            }
            if (observation == null) {
                throw new NullPointerException("observation must not be null");
            }
            ObservationStoreException andSet = this.exception.getAndSet(null);
            if (andSet != null) {
                throw andSet;
            }
            return this.map.putIfAbsent(token, observation);
        }

        public Observation put(Token token, Observation observation) {
            if (token == null) {
                throw new NullPointerException("token must not be null");
            }
            if (observation == null) {
                throw new NullPointerException("observation must not be null");
            }
            ObservationStoreException andSet = this.exception.getAndSet(null);
            if (andSet != null) {
                throw andSet;
            }
            return this.map.put(token, observation);
        }

        public Observation get(Token token) {
            if (token == null) {
                return null;
            }
            return ObservationUtil.shallowClone(this.map.get(token));
        }

        public void remove(Token token) {
            if (token != null) {
                this.map.remove(token);
            }
        }

        public void setContext(Token token, EndpointContext endpointContext) {
            Observation observation;
            if (token == null || endpointContext == null || (observation = this.map.get(token)) == null) {
                return;
            }
            this.map.replace(token, observation, new Observation(observation.getRequest(), endpointContext));
        }

        public void start() {
        }

        public void stop() {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/eclipse/californium/core/test/ObserveTest$MyResource.class */
    public static class MyResource extends CoapResource {
        private volatile String currentLabel;
        private volatile String currentResponse;
        private AtomicBoolean reject;
        private AtomicReference<CoAP.ResponseCode> responseCode;
        private AtomicInteger counter;
        private AtomicInteger delay;

        public MyResource(String str) {
            super(str);
            this.reject = new AtomicBoolean();
            this.responseCode = new AtomicReference<>();
            this.counter = new AtomicInteger();
            this.delay = new AtomicInteger();
            setObservable(true);
            setObserveType(CoAP.Type.CON);
            prepareResponse(true);
        }

        public void handleGET(CoapExchange coapExchange) {
            Response response;
            int andSet = this.delay.getAndSet(0);
            if (0 < andSet) {
                try {
                    Thread.sleep(andSet);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    return;
                }
            }
            if (this.reject.compareAndSet(true, false)) {
                coapExchange.reject();
                return;
            }
            CoAP.ResponseCode andSet2 = this.responseCode.getAndSet(null);
            if (andSet2 != null) {
                response = new Response(andSet2);
            } else {
                response = new Response(CoAP.ResponseCode.CONTENT);
                response.setPayload(this.currentResponse);
            }
            coapExchange.respond(response);
        }

        public void changed() {
            prepareResponse(false);
            super.changed();
        }

        public void changed(String str) {
            this.currentLabel = str;
            changed();
        }

        public void rejectNextGet() {
            this.reject.set(true);
        }

        public void delayNextGet(int i) {
            this.delay.set(i);
        }

        public void prepareResponse(boolean z) {
            int incrementAndGet = this.counter.incrementAndGet();
            if (null == this.currentLabel) {
                this.currentResponse = String.format("\"%s says hi for the %d time\"", getName(), Integer.valueOf(incrementAndGet));
            } else {
                this.currentResponse = String.format("\"%s says %s for the %d time\"", getName(), this.currentLabel, Integer.valueOf(incrementAndGet));
            }
            if (z) {
                LOGGER.debug("Resource {} changed to {}", getName(), this.currentResponse);
            } else {
                LOGGER.info("Resource {} changed to {}", getName(), this.currentResponse);
            }
        }
    }

    /* loaded from: input_file:org/eclipse/californium/core/test/ObserveTest$ServerMessageInterceptor.class */
    private class ServerMessageInterceptor extends MessageInterceptorAdapter {
        private final AtomicInteger resetCounter;

        public ServerMessageInterceptor(ObserveTest observeTest) {
            this(null);
        }

        public ServerMessageInterceptor(AtomicInteger atomicInteger) {
            this.resetCounter = atomicInteger;
        }

        public void receiveEmptyMessage(EmptyMessage emptyMessage) {
            if (emptyMessage.getType() != CoAP.Type.RST || this.resetCounter == null) {
                return;
            }
            ObserveTest.LOGGER.info("Received {}. RST. MID: {}", Integer.valueOf(this.resetCounter.incrementAndGet()), Integer.valueOf(emptyMessage.getMID()));
            emptyMessage.cancel();
        }
    }

    @Before
    public void startupServer() {
        this.cleanup.add(createServer());
    }

    @After
    public void shutdownServer() {
        Endpoint defaultEndpoint = EndpointManager.getEndpointManager().getDefaultEndpoint();
        Iterator it = defaultEndpoint.getInterceptors().iterator();
        while (it.hasNext()) {
            defaultEndpoint.removeInterceptor((MessageInterceptor) it.next());
        }
    }

    @Test
    public void testObserveLifecycle() throws Exception {
        EndpointManager.getEndpointManager().getDefaultEndpoint().addInterceptor(new ClientMessageInterceptor());
        Request newGet = Request.newGet();
        newGet.setURI(this.uriX);
        newGet.setObserve();
        newGet.send();
        Request newGet2 = Request.newGet();
        newGet2.setURI(this.uriY);
        newGet2.setObserve();
        newGet2.send();
        Response waitForResponse = newGet.waitForResponse(1000L);
        Assert.assertNotNull("Client received no response", waitForResponse);
        Assert.assertTrue(waitForResponse.getOptions().hasObserve());
        Assert.assertEquals(1L, this.resourceX.getObserverCount());
        Assert.assertEquals(waitForResponse.getPayloadString(), this.resourceX.currentResponse);
        Response waitForResponse2 = newGet2.waitForResponse(1000L);
        Assert.assertNotNull("Client received no response", waitForResponse2);
        Assert.assertTrue(waitForResponse2.getOptions().hasObserve());
        Assert.assertEquals(1L, this.resourceY.getObserverCount());
        Assert.assertEquals(waitForResponse2.getPayloadString(), this.resourceY.currentResponse);
        LOGGER.info("\nObserve relation established, resource changes");
        Thread.sleep(50L);
        this.resourceX.changed("Lifecycle");
        Assert.assertTrue(this.waitforit.await(1000L, TimeUnit.MILLISECONDS));
        Thread.sleep(500L);
        Assert.assertEquals(0L, this.resourceX.getObserverCount());
        Assert.assertEquals(0L, this.resourceY.getObserverCount());
        this.observations.setStoreException(new ObservationStoreException("test"));
        Request newGet3 = Request.newGet();
        newGet3.setURI(this.uriY);
        newGet3.setObserve();
        newGet3.send();
        Assert.assertNull("Client received unexpected response", newGet3.waitForResponse(1000L));
        Assert.assertNotNull("send error expected", newGet3.getSendError());
    }

    @Test
    public void testObserveClient() throws Exception {
        this.serverEndpoint.addInterceptor(new ServerMessageInterceptor(new AtomicInteger(0)));
        this.resourceX.setObserveType(CoAP.Type.NON);
        CoapClient coapClient = new CoapClient(this.uriX);
        this.cleanup.add(coapClient);
        CountingCoapHandler countingCoapHandler = new CountingCoapHandler();
        CoapObserveRelation observeAndWait = coapClient.observeAndWait(countingCoapHandler);
        Assert.assertTrue(countingCoapHandler.waitOnLoadCalls(1, 1000L, TimeUnit.MILLISECONDS));
        Assert.assertFalse("Relation canceled", observeAndWait.isCanceled());
        Assert.assertNotNull("Response not received", observeAndWait.getCurrent());
        Assert.assertEquals("\"resX says hi for the 1 time\"", observeAndWait.getCurrent().getResponseText());
        observeAndWait.reactiveCancel();
        LOGGER.info("{} reactive canceled", this.uriX);
        for (int i = 0; i < 3; i++) {
            this.resourceX.changed("client");
            Thread.sleep(50L);
        }
        Assert.assertFalse(countingCoapHandler.waitOnLoadCalls(2, 1000L, TimeUnit.MILLISECONDS));
        Assert.assertEquals(3, r0.get());
        Assert.assertEquals(1L, this.resourceX.getObserverCount());
    }

    @Test
    public void testObserveClientDeleteResource() throws Exception {
        this.serverEndpoint.addInterceptor(new ServerMessageInterceptor(this));
        this.resourceX.setObserveType(CoAP.Type.NON);
        CoapClient coapClient = new CoapClient(this.uriX);
        this.cleanup.add(coapClient);
        CountingCoapHandler countingCoapHandler = new CountingCoapHandler();
        CoapObserveRelation observeAndWait = coapClient.observeAndWait(countingCoapHandler);
        Assert.assertTrue(countingCoapHandler.waitOnLoadCalls(1, 1000L, TimeUnit.MILLISECONDS));
        Assert.assertFalse("Relation canceled", observeAndWait.isCanceled());
        Assert.assertNotNull("Response not received", observeAndWait.getCurrent());
        Assert.assertEquals("\"resX says hi for the 1 time\"", observeAndWait.getCurrent().getResponseText());
        this.resourceX.delete();
        LOGGER.info("{} deleted", this.uriX);
        Assert.assertTrue(countingCoapHandler.waitOnLoadCalls(2, 1000L, TimeUnit.MILLISECONDS));
        CoapResponse coapResponse = countingCoapHandler.responses.get(1);
        Assert.assertEquals(CoAP.ResponseCode.NOT_FOUND, coapResponse.getCode());
        Assert.assertEquals(CoAP.Type.CON, coapResponse.advanced().getType());
        CoapResponse current = observeAndWait.getCurrent();
        Assert.assertEquals(CoAP.ResponseCode.NOT_FOUND, current.getCode());
        Assert.assertEquals(CoAP.Type.CON, current.advanced().getType());
    }

    @Test
    public void testObserveDefaultMessageType() throws Exception {
        this.serverEndpoint.addInterceptor(new ServerMessageInterceptor(this));
        this.resourceX.setObserveType(null);
        CoapClient coapClient = new CoapClient(this.uriX);
        this.cleanup.add(coapClient);
        CountingCoapHandler countingCoapHandler = new CountingCoapHandler();
        CoapObserveRelation observeAndWait = coapClient.observeAndWait(countingCoapHandler);
        Assert.assertTrue(countingCoapHandler.waitOnLoadCalls(1, 1000L, TimeUnit.MILLISECONDS));
        Assert.assertFalse("Relation canceled", observeAndWait.isCanceled());
        Assert.assertNotNull("Response not received", observeAndWait.getCurrent());
        Assert.assertEquals("\"resX says hi for the 1 time\"", observeAndWait.getCurrent().getResponseText());
        Assert.assertEquals(CoAP.Type.ACK, observeAndWait.getCurrent().advanced().getType());
        this.resourceX.changed("new");
        Assert.assertTrue(countingCoapHandler.waitOnLoadCalls(2, 1000L, TimeUnit.MILLISECONDS));
        Assert.assertNotNull("Response not received", observeAndWait.getCurrent());
        Assert.assertEquals("\"resX says new for the 2 time\"", observeAndWait.getCurrent().getResponseText());
        Assert.assertEquals(CoAP.Type.NON, observeAndWait.getCurrent().advanced().getType());
    }

    @Test
    public void testObserveNonErrorResponse() throws Exception {
        this.serverEndpoint.addInterceptor(new ServerMessageInterceptor(this));
        this.resourceX.setObserveType(CoAP.Type.NON);
        CoapClient coapClient = new CoapClient(this.uriX);
        this.cleanup.add(coapClient);
        CountingCoapHandler countingCoapHandler = new CountingCoapHandler();
        CoapObserveRelation observeAndWait = coapClient.observeAndWait(countingCoapHandler);
        Assert.assertTrue(countingCoapHandler.waitOnLoadCalls(1, 1000L, TimeUnit.MILLISECONDS));
        Assert.assertFalse("Relation canceled", observeAndWait.isCanceled());
        Assert.assertNotNull("Response not received", observeAndWait.getCurrent());
        Assert.assertEquals("\"resX says hi for the 1 time\"", observeAndWait.getCurrent().getResponseText());
        Assert.assertEquals(CoAP.Type.ACK, observeAndWait.getCurrent().advanced().getType());
        this.resourceX.changed("new");
        Assert.assertTrue(countingCoapHandler.waitOnLoadCalls(2, 1000L, TimeUnit.MILLISECONDS));
        Assert.assertNotNull("Response not received", observeAndWait.getCurrent());
        Assert.assertEquals("\"resX says new for the 2 time\"", observeAndWait.getCurrent().getResponseText());
        Assert.assertEquals(CoAP.Type.NON, observeAndWait.getCurrent().advanced().getType());
        this.resourceX.responseCode.set(CoAP.ResponseCode.PRECONDITION_FAILED);
        this.resourceX.changed();
        Assert.assertTrue(countingCoapHandler.waitOnLoadCalls(3, 1000L, TimeUnit.MILLISECONDS));
        Assert.assertNotNull("Response not received", observeAndWait.getCurrent());
        Assert.assertEquals(CoAP.ResponseCode.PRECONDITION_FAILED, observeAndWait.getCurrent().getCode());
        Assert.assertEquals(CoAP.Type.CON, observeAndWait.getCurrent().advanced().getType());
        Assert.assertTrue("Observation not canceled by error", observeAndWait.isCanceled());
    }

    @Test
    public void testObserveConErrorResponse() throws Exception {
        this.serverEndpoint.addInterceptor(new ServerMessageInterceptor(this));
        this.resourceX.setObserveType(CoAP.Type.CON);
        CoapClient coapClient = new CoapClient(this.uriX);
        this.cleanup.add(coapClient);
        CountingCoapHandler countingCoapHandler = new CountingCoapHandler();
        CoapObserveRelation observeAndWait = coapClient.observeAndWait(countingCoapHandler);
        Assert.assertTrue(countingCoapHandler.waitOnLoadCalls(1, 1000L, TimeUnit.MILLISECONDS));
        Assert.assertFalse("Relation canceled", observeAndWait.isCanceled());
        Assert.assertNotNull("Response not received", observeAndWait.getCurrent());
        Assert.assertEquals("\"resX says hi for the 1 time\"", observeAndWait.getCurrent().getResponseText());
        Assert.assertEquals(CoAP.Type.ACK, observeAndWait.getCurrent().advanced().getType());
        this.resourceX.changed("new");
        Assert.assertTrue(countingCoapHandler.waitOnLoadCalls(2, 1000L, TimeUnit.MILLISECONDS));
        Assert.assertNotNull("Response not received", observeAndWait.getCurrent());
        Assert.assertEquals("\"resX says new for the 2 time\"", observeAndWait.getCurrent().getResponseText());
        Assert.assertEquals(CoAP.Type.CON, observeAndWait.getCurrent().advanced().getType());
        this.resourceX.responseCode.set(CoAP.ResponseCode.PRECONDITION_FAILED);
        this.resourceX.changed();
        Assert.assertTrue(countingCoapHandler.waitOnLoadCalls(3, 1000L, TimeUnit.MILLISECONDS));
        Assert.assertNotNull("Response not received", observeAndWait.getCurrent());
        Assert.assertEquals(CoAP.ResponseCode.PRECONDITION_FAILED, observeAndWait.getCurrent().getCode());
        Assert.assertEquals(CoAP.Type.CON, observeAndWait.getCurrent().advanced().getType());
        Assert.assertTrue("Observation not canceled by error", observeAndWait.isCanceled());
    }

    @Test(expected = IllegalArgumentException.class)
    public void testObserveClearWithSuccessResponse() throws Exception {
        this.serverEndpoint.addInterceptor(new ServerMessageInterceptor(this));
        this.resourceX.setObserveType(CoAP.Type.NON);
        CoapClient coapClient = new CoapClient(this.uriX);
        this.cleanup.add(coapClient);
        CountingCoapHandler countingCoapHandler = new CountingCoapHandler();
        CoapObserveRelation observeAndWait = coapClient.observeAndWait(countingCoapHandler);
        Assert.assertTrue(countingCoapHandler.waitOnLoadCalls(1, 1000L, TimeUnit.MILLISECONDS));
        Assert.assertFalse("Relation canceled", observeAndWait.isCanceled());
        Assert.assertNotNull("Response not received", observeAndWait.getCurrent());
        Assert.assertEquals("\"resX says hi for the 1 time\"", observeAndWait.getCurrent().getResponseText());
        Assert.assertEquals(CoAP.Type.ACK, observeAndWait.getCurrent().advanced().getType());
        this.resourceX.changed("new");
        Assert.assertTrue(countingCoapHandler.waitOnLoadCalls(2, 1000L, TimeUnit.MILLISECONDS));
        Assert.assertNotNull("Response not received", observeAndWait.getCurrent());
        Assert.assertEquals("\"resX says new for the 2 time\"", observeAndWait.getCurrent().getResponseText());
        Assert.assertEquals(CoAP.Type.NON, observeAndWait.getCurrent().advanced().getType());
        this.resourceX.clearAndNotifyObserveRelations(CoAP.ResponseCode.VALID);
    }

    @Test
    public void testObserveClientReregister() throws Exception {
        this.resourceX.setObserveType(CoAP.Type.NON);
        MessageObserverCounterMessageInterceptor messageObserverCounterMessageInterceptor = new MessageObserverCounterMessageInterceptor();
        EndpointManager.getEndpointManager().getDefaultEndpoint().addInterceptor(messageObserverCounterMessageInterceptor);
        CoapClient coapClient = new CoapClient(this.uriX);
        this.cleanup.add(coapClient);
        CountingCoapHandler countingCoapHandler = new CountingCoapHandler();
        CoapObserveRelation observeAndWait = coapClient.observeAndWait(countingCoapHandler);
        Assert.assertTrue(countingCoapHandler.waitOnLoadCalls(1, 1000L, TimeUnit.MILLISECONDS));
        Assert.assertFalse("Response not received", observeAndWait.isCanceled());
        Assert.assertNotNull("Response not received", observeAndWait.getCurrent());
        Assert.assertEquals("\"resX says hi for the 1 time\"", observeAndWait.getCurrent().getResponseText());
        int messageObserverCounter = messageObserverCounterMessageInterceptor.getMessageObserverCounter();
        this.resourceX.changed("client");
        Assert.assertTrue(countingCoapHandler.waitOnLoadCalls(2, 1000L, TimeUnit.MILLISECONDS));
        Assert.assertFalse(observeAndWait.isCanceled());
        Assert.assertEquals("\"resX says client for the 2 time\"", observeAndWait.getCurrent().getResponseText());
        observeAndWait.reregister();
        Assert.assertFalse(observeAndWait.isCanceled());
        Assert.assertTrue(countingCoapHandler.waitOnLoadCalls(3, 1000L, TimeUnit.MILLISECONDS));
        LOGGER.info("{} reregistered", this.uriX);
        Assert.assertFalse(observeAndWait.isCanceled());
        Assert.assertEquals("\"resX says client for the 2 time\"", observeAndWait.getCurrent().getResponseText());
        this.resourceX.changed("new client");
        Assert.assertTrue(countingCoapHandler.waitOnLoadCalls(4, 1000L, TimeUnit.MILLISECONDS));
        Assert.assertEquals("\"resX says new client for the 3 time\"", observeAndWait.getCurrent().getResponseText());
        Assert.assertEquals(1L, this.resourceX.getObserverCount());
        Assert.assertEquals("message observer leak", messageObserverCounter, messageObserverCounterMessageInterceptor.getMessageObserverCounter());
    }

    @Test(expected = IllegalStateException.class)
    public void testObserveClientReregisterAfterReject() throws Exception {
        this.resourceX.setObserveType(CoAP.Type.NON);
        this.resourceX.rejectNextGet();
        CoapClient coapClient = new CoapClient(this.uriX);
        this.cleanup.add(coapClient);
        CoapObserveRelation observeAndWait = coapClient.observeAndWait(new CountingCoapHandler());
        Assert.assertTrue("Not rejected", observeAndWait.isCanceled());
        observeAndWait.reregister();
    }

    @Test(expected = IllegalStateException.class)
    public void testObserveClientReregisterAfterTimeout() throws Exception {
        this.resourceX.setObserveType(CoAP.Type.NON);
        this.resourceX.delayNextGet(100);
        CoapClient coapClient = new CoapClient(this.uriX);
        this.cleanup.add(coapClient);
        coapClient.setTimeout(1L);
        CoapObserveRelation observeAndWait = coapClient.observeAndWait(new CountingCoapHandler());
        Assert.assertTrue("No timeout", observeAndWait.isCanceled());
        coapClient.setTimeout((Long) null);
        observeAndWait.reregister();
    }

    @Test
    public void testObserveClientReregisterBeforeTimeout() throws Exception {
        this.resourceX.setObserveType(CoAP.Type.NON);
        this.resourceX.delayNextGet(100);
        CoapClient coapClient = new CoapClient(this.uriX);
        this.cleanup.add(coapClient);
        CountingCoapHandler countingCoapHandler = new CountingCoapHandler();
        CoapObserveRelation observe = coapClient.observe(countingCoapHandler);
        Assert.assertFalse("Timeout", observe.isCanceled());
        Assert.assertFalse("reregister not ignored", observe.reregister());
        Assert.assertTrue(countingCoapHandler.waitOnLoadCalls(1, 1000L, TimeUnit.MILLISECONDS));
        this.resourceX.changed("client");
        Assert.assertTrue(countingCoapHandler.waitOnLoadCalls(2, 1000L, TimeUnit.MILLISECONDS));
        Assert.assertFalse("Response not received", observe.isCanceled());
    }

    @Test
    public void testObserveClientReregisterAfterReregister() throws Exception {
        this.resourceX.setObserveType(CoAP.Type.NON);
        CoapClient coapClient = new CoapClient(this.uriX);
        this.cleanup.add(coapClient);
        CountingCoapHandler countingCoapHandler = new CountingCoapHandler();
        CoapObserveRelation observeAndWait = coapClient.observeAndWait(countingCoapHandler);
        Assert.assertFalse("Response not received", observeAndWait.isCanceled());
        Assert.assertTrue(countingCoapHandler.waitOnLoadCalls(1, 1000L, TimeUnit.MILLISECONDS));
        this.resourceX.delayNextGet(100);
        Assert.assertTrue("reregister not triggered", observeAndWait.reregister());
        Assert.assertFalse("reregister not ignored", observeAndWait.reregister());
        Assert.assertTrue(countingCoapHandler.waitOnLoadCalls(2, 1000L, TimeUnit.MILLISECONDS));
        this.resourceX.changed("client");
        Assert.assertTrue(countingCoapHandler.waitOnLoadCalls(3, 1000L, TimeUnit.MILLISECONDS));
        Assert.assertFalse("Response not received", observeAndWait.isCanceled());
    }

    @Test(expected = IllegalArgumentException.class)
    public void testObserveAndWaitExceptionHandling() throws Exception {
        CoapClient coapClient = new CoapClient(this.uriX);
        this.cleanup.add(coapClient);
        coapClient.observeAndWait(Request.newGet().setURI(this.uriX), new CountingCoapHandler());
    }

    @Test(expected = IllegalArgumentException.class)
    public void testObserveExceptionHandling() throws Exception {
        CoapClient coapClient = new CoapClient(this.uriX);
        this.cleanup.add(coapClient);
        coapClient.observe(Request.newGet().setURI(this.uriX), new CountingCoapHandler());
    }

    private CoapServer createServer() {
        Configuration configuration = network.createTestConfig().set(CoapConfig.ACK_TIMEOUT, 200, TimeUnit.MILLISECONDS).set(CoapConfig.ACK_INIT_RANDOM, Float.valueOf(1.0f)).set(CoapConfig.ACK_TIMEOUT_SCALE, Float.valueOf(1.0f));
        CoapEndpoint.Builder builder = new CoapEndpoint.Builder();
        builder.setInetSocketAddress(TestTools.LOCALHOST_EPHEMERAL);
        builder.setConfiguration(configuration);
        this.serverEndpoint = builder.build();
        this.serverEndpoint.addInterceptor(new MessageTracer());
        int incrementAndGet = counter.incrementAndGet();
        CoapServer coapServer = new CoapServer(configuration, new int[0]);
        coapServer.setExecutors(ExecutorsUtil.newScheduledThreadPool(((Integer) configuration.get(CoapConfig.PROTOCOL_STAGE_THREAD_COUNT)).intValue(), new NamedThreadFactory("CoapServer(main):" + incrementAndGet + "#")), ExecutorsUtil.newDefaultSecondaryScheduler("CoapServer(secondary):" + incrementAndGet + "#"), false);
        coapServer.addEndpoint(this.serverEndpoint);
        this.resourceX = new MyResource(TARGET_X);
        this.resourceY = new MyResource(TARGET_Y);
        coapServer.add(new Resource[]{this.resourceX});
        coapServer.add(new Resource[]{this.resourceY});
        coapServer.start();
        this.uriX = TestTools.getUri((Endpoint) this.serverEndpoint, TARGET_X);
        this.uriY = TestTools.getUri((Endpoint) this.serverEndpoint, TARGET_Y);
        this.observations = new MyObservationStore();
        CoapEndpoint.Builder builder2 = new CoapEndpoint.Builder();
        builder2.setInetSocketAddress(TestTools.LOCALHOST_EPHEMERAL);
        builder2.setConfiguration(configuration);
        builder2.setObservationStore(this.observations);
        EndpointManager.getEndpointManager().setDefaultEndpoint(builder2.build());
        return coapServer;
    }
}
