/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pig.pen;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.util.IdentityHashSet;
import org.apache.pig.impl.util.Pair;
import org.apache.pig.newplan.Operator;
import org.apache.pig.newplan.logical.relational.LOCogroup;
import org.apache.pig.newplan.logical.relational.LOCross;
import org.apache.pig.newplan.logical.relational.LODistinct;
import org.apache.pig.newplan.logical.relational.LOFilter;
import org.apache.pig.newplan.logical.relational.LOForEach;
import org.apache.pig.newplan.logical.relational.LOJoin;
import org.apache.pig.newplan.logical.relational.LOLimit;
import org.apache.pig.newplan.logical.relational.LOLoad;
import org.apache.pig.newplan.logical.relational.LOSort;
import org.apache.pig.newplan.logical.relational.LOSplit;
import org.apache.pig.newplan.logical.relational.LOSplitOutput;
import org.apache.pig.newplan.logical.relational.LOStore;
import org.apache.pig.newplan.logical.relational.LOUnion;
import org.apache.pig.newplan.logical.relational.LogicalPlan;
import org.apache.pig.newplan.logical.relational.LogicalRelationalNodesVisitor;
import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
import org.apache.pig.pen.ExampleGenerator;
import org.apache.pig.pen.util.ExampleTuple;
import org.apache.pig.pen.util.LineageTracer;
import org.apache.pig.pen.util.MetricEvaluation;
import org.apache.pig.pen.util.PreOrderDepthFirstWalker;

public class LineageTrimmingVisitor
extends LogicalRelationalNodesVisitor {
    LogicalPlan plan = null;
    Map<LOLoad, DataBag> baseData;
    Map<FileSpec, DataBag> inputToDataMap;
    Map<Operator, PhysicalOperator> LogToPhyMap = null;
    PhysicalPlan physPlan = null;
    double completeness = 100.0;
    Log log = LogFactory.getLog(this.getClass());
    Map<Operator, Collection<IdentityHashSet<Tuple>>> AffinityGroups = new HashMap<Operator, Collection<IdentityHashSet<Tuple>>>();
    Map<Operator, LineageTracer> Lineage = new HashMap<Operator, LineageTracer>();
    boolean continueTrimming;
    PigContext pc;
    private ExampleGenerator eg;

    public LineageTrimmingVisitor(LogicalPlan plan, Map<LOLoad, DataBag> baseData, ExampleGenerator eg, Map<Operator, PhysicalOperator> LogToPhyMap, PhysicalPlan physPlan, PigContext pc) throws IOException, InterruptedException {
        super(plan, new PreOrderDepthFirstWalker(plan));
        this.baseData = baseData;
        this.plan = plan;
        this.LogToPhyMap = LogToPhyMap;
        this.pc = pc;
        this.physPlan = physPlan;
        this.eg = eg;
        this.inputToDataMap = new HashMap<FileSpec, DataBag>();
        this.init();
    }

    public void init() throws IOException, InterruptedException {
        Map<Operator, DataBag> data = this.eg.getData();
        LineageTracer lineage = this.eg.getLineage();
        Map<LogicalRelationalOperator, Collection<IdentityHashSet<Tuple>>> OpToEqClasses = this.eg.getLoToEqClassMap();
        for (Operator leaf : this.plan.getSinks()) {
            this.Lineage.put(leaf, lineage);
            this.AffinityGroups.put(leaf, this.eg.getEqClasses());
        }
        this.completeness = MetricEvaluation.getCompleteness(null, data, OpToEqClasses, true);
        this.LogToPhyMap = this.eg.getLogToPhyMap();
        this.continueTrimming = true;
    }

    @Override
    public void visit(LOCogroup cg) throws FrontendException {
        if (this.plan.getSuccessors(cg) != null && this.plan.getSuccessors(cg).get(0) instanceof LOForEach) {
            return;
        }
        if (this.continueTrimming) {
            try {
                this.continueTrimming = this.checkCompleteness(cg);
                LineageTracer lineage = null;
                if (cg.getInputs(this.plan).size() == 1) {
                    lineage = this.eg.getLineage();
                    this.AffinityGroups.put(cg.getInputs(this.plan).get(0), this.eg.getEqClasses());
                    this.Lineage.put(cg.getInputs(this.plan).get(0), lineage);
                } else {
                    for (Operator input : cg.getInputs(this.plan)) {
                        this.Lineage.put(input, this.eg.getLineage());
                        this.AffinityGroups.put(input, this.eg.getEqClasses());
                    }
                }
            }
            catch (Exception e) {
                throw new FrontendException("Exception : " + e.getMessage());
            }
        }
    }

    @Override
    public void visit(LOJoin join) throws FrontendException {
        if (this.continueTrimming) {
            this.processOperator(join);
        }
    }

    @Override
    public void visit(LOCross cs) throws FrontendException {
        if (this.continueTrimming) {
            this.processOperator(cs);
        }
    }

    @Override
    public void visit(LODistinct dt) throws FrontendException {
        if (this.continueTrimming) {
            this.processOperator(dt);
        }
    }

    @Override
    public void visit(LOFilter filter) throws FrontendException {
        if (this.continueTrimming) {
            this.processOperator(filter);
        }
    }

    @Override
    public void visit(LOStore store) throws FrontendException {
        if (this.continueTrimming) {
            this.processOperator(store);
        }
    }

    @Override
    public void visit(LOForEach forEach) throws FrontendException {
        if (this.continueTrimming) {
            this.processOperator(forEach);
        }
    }

    @Override
    public void visit(LOLimit limOp) throws FrontendException {
        if (this.continueTrimming) {
            this.processOperator(limOp);
        }
    }

    @Override
    public void visit(LOLoad load) throws FrontendException {
        if (this.continueTrimming) {
            this.processOperator(load);
        }
    }

    @Override
    public void visit(LOSort s) throws FrontendException {
        if (this.continueTrimming) {
            this.processOperator(s);
        }
    }

    @Override
    public void visit(LOSplit split) throws FrontendException {
        if (this.continueTrimming) {
            this.processOperator(split);
        }
    }

    @Override
    public void visit(LOSplitOutput split) throws FrontendException {
        if (this.continueTrimming) {
            this.processOperator(split);
        }
    }

    @Override
    public void visit(LOUnion u) throws FrontendException {
        if (this.continueTrimming) {
            this.processOperator(u);
        }
    }

    private Map<LOLoad, DataBag> PruneBaseDataConstrainedCoverage(Map<LOLoad, DataBag> baseData, LineageTracer lineage, Collection<IdentityHashSet<Tuple>> equivalenceClasses) {
        IdentityHashMap<Tuple, Collection<Tuple>> membershipMap = lineage.getMembershipMap();
        IdentityHashMap<Tuple, Double> lineageGroupWeights = lineage.getWeightedCounts(2.0f, 1.0f);
        IdentityHashMap<Tuple, HashSet<IdentityHashSet<Tuple>>> lineageGroupToEquivClasses = new IdentityHashMap<Tuple, HashSet<IdentityHashSet<Tuple>>>();
        for (IdentityHashSet<Tuple> equivClass : equivalenceClasses) {
            for (Tuple t : equivClass) {
                Tuple lineageGroup = lineage.getRepresentative(t);
                HashSet<IdentityHashSet<Tuple>> entry = (HashSet<IdentityHashSet<Tuple>>)lineageGroupToEquivClasses.get(lineageGroup);
                if (entry == null) {
                    entry = new HashSet<IdentityHashSet<Tuple>>();
                    lineageGroupToEquivClasses.put(lineageGroup, entry);
                }
                entry.add(equivClass);
            }
        }
        IdentityHashSet<Tuple> selectedLineageGroups = new IdentityHashSet<Tuple>();
        while (!lineageGroupToEquivClasses.isEmpty()) {
            double bestWeight = -1.0;
            Tuple bestLineageGroup = null;
            Set bestEquivClassesCovered = null;
            int bestNumEquivClassesCovered = 0;
            for (Tuple lineageGroup : lineageGroupToEquivClasses.keySet()) {
                double weight = lineageGroupWeights.get(lineageGroup);
                Set equivClassesCovered = (Set)lineageGroupToEquivClasses.get(lineageGroup);
                int numEquivClassesCovered = equivClassesCovered.size();
                if (numEquivClassesCovered <= bestNumEquivClassesCovered && (numEquivClassesCovered != bestNumEquivClassesCovered || !(weight < bestWeight))) continue;
                if (selectedLineageGroups.contains(lineageGroup)) {
                    bestLineageGroup = lineageGroup;
                    bestEquivClassesCovered = equivClassesCovered;
                    continue;
                }
                bestWeight = weight;
                bestLineageGroup = lineageGroup;
                bestNumEquivClassesCovered = numEquivClassesCovered;
                bestEquivClassesCovered = equivClassesCovered;
            }
            selectedLineageGroups.add(bestLineageGroup);
            HashSet toCopy = bestEquivClassesCovered;
            bestEquivClassesCovered = new HashSet();
            bestEquivClassesCovered.addAll(toCopy);
            LinkedList<Tuple> toRemove = new LinkedList<Tuple>();
            for (Tuple lineageGroup : lineageGroupToEquivClasses.keySet()) {
                Set equivClasses = (Set)lineageGroupToEquivClasses.get(lineageGroup);
                Iterator it = equivClasses.iterator();
                while (it.hasNext()) {
                    IdentityHashSet equivClass = (IdentityHashSet)it.next();
                    if (!bestEquivClassesCovered.contains(equivClass)) continue;
                    it.remove();
                }
                if (equivClasses.size() != 0) continue;
                toRemove.add(lineageGroup);
            }
            for (Tuple removeMe : toRemove) {
                lineageGroupToEquivClasses.remove(removeMe);
            }
        }
        IdentityHashSet<Tuple> tuplesToRetain = new IdentityHashSet<Tuple>();
        for (Tuple lineageGroup : selectedLineageGroups) {
            Collection<Tuple> members = membershipMap.get(lineageGroup);
            for (Tuple t : members) {
                tuplesToRetain.add(t);
            }
        }
        HashMap<LOLoad, DataBag> newBaseData = new HashMap<LOLoad, DataBag>();
        for (LOLoad loadOp : baseData.keySet()) {
            DataBag data = baseData.get(loadOp);
            DataBag newData = BagFactory.getInstance().newDefaultBag();
            for (Tuple t : data) {
                if (!tuplesToRetain.contains(t)) continue;
                newData.add(t);
            }
            newBaseData.put(loadOp, newData);
        }
        return newBaseData;
    }

    private void processLoad(LOLoad ld) throws FrontendException {
        if (this.inputToDataMap.get(ld.getFileSpec()) != null) {
            this.baseData.put(ld, this.inputToDataMap.get(ld.getFileSpec()));
            return;
        }
        DataBag data = this.baseData.get(ld);
        if (data == null || data.size() < 2L) {
            return;
        }
        HashSet<Tuple> realData = new HashSet<Tuple>();
        HashSet<Tuple> syntheticData = new HashSet<Tuple>();
        for (Tuple t : data) {
            if (((ExampleTuple)t).synthetic) {
                syntheticData.add(t);
                continue;
            }
            realData.add(t);
        }
        HashMap<LOLoad, DataBag> newBaseData = new HashMap<LOLoad, DataBag>();
        DataBag newData = BagFactory.getInstance().newDefaultBag();
        newBaseData.put(ld, newData);
        for (Map.Entry<LOLoad, DataBag> entry : this.baseData.entrySet()) {
            if (entry.getKey() == ld) continue;
            if (!entry.getKey().getFileSpec().equals(ld.getFileSpec())) {
                newBaseData.put(entry.getKey(), entry.getValue());
                continue;
            }
            newBaseData.put(entry.getKey(), newData);
        }
        if (this.checkNewBaseData(newData, newBaseData, realData)) {
            this.checkNewBaseData(newData, newBaseData, syntheticData);
        }
        this.inputToDataMap.put(ld.getFileSpec(), this.baseData.get(ld));
    }

    private boolean checkNewBaseData(DataBag data, Map<LOLoad, DataBag> newBaseData, Set<Tuple> loadData) throws FrontendException {
        double newCompleteness;
        Map<Operator, DataBag> derivedData;
        LinkedList<Pair<Tuple, Double>> sortedBase = new LinkedList<Pair<Tuple, Double>>();
        DataBag oldData = BagFactory.getInstance().newDefaultBag();
        oldData.addAll(data);
        double tmpCompleteness = this.completeness;
        for (Tuple tuple : loadData) {
            data.add(tuple);
            try {
                derivedData = this.eg.getData(newBaseData);
            }
            catch (Exception e) {
                throw new FrontendException("Exception: " + e.getMessage());
            }
            newCompleteness = MetricEvaluation.getCompleteness(null, derivedData, this.eg.getLoToEqClassMap(), true);
            sortedBase.add(new Pair<Tuple, Double>(tuple, newCompleteness));
            if (!(newCompleteness >= tmpCompleteness)) continue;
            break;
        }
        Collections.sort(sortedBase, new Comparator<Pair<Tuple, Double>>(){

            @Override
            public int compare(Pair<Tuple, Double> o1, Pair<Tuple, Double> o2) {
                return (Double)o1.second > (Double)o2.second ? -1 : (o1.second == o2.second ? 0 : 1);
            }
        });
        data.clear();
        data.addAll(oldData);
        for (Pair pair : sortedBase) {
            data.add((Tuple)pair.first);
            try {
                derivedData = this.eg.getData(newBaseData);
            }
            catch (Exception e) {
                throw new FrontendException("Exception: " + e.getMessage());
            }
            newCompleteness = MetricEvaluation.getCompleteness(null, derivedData, this.eg.getLoToEqClassMap(), true);
            if (!(newCompleteness >= this.completeness)) continue;
            this.completeness = newCompleteness;
            this.baseData.putAll(newBaseData);
            return false;
        }
        return true;
    }

    private void processOperator(LogicalRelationalOperator op) throws FrontendException {
        try {
            if (op instanceof LOLoad) {
                this.processLoad((LOLoad)op);
                return;
            }
            this.continueTrimming = this.checkCompleteness(op);
            if (this.plan.getPredecessors(op) == null) {
                return;
            }
            if (!this.continueTrimming) {
                return;
            }
            Operator childOp = this.plan.getPredecessors(op).get(0);
            if (op instanceof LOForEach && childOp instanceof LOCogroup) {
                LOCogroup cg = (LOCogroup)childOp;
                for (Operator input : cg.getInputs(this.plan)) {
                    this.AffinityGroups.put(input, this.eg.getEqClasses());
                    this.Lineage.put(input, this.eg.getLineage());
                }
            } else {
                List<Operator> childOps = this.plan.getPredecessors(op);
                for (Operator lo : childOps) {
                    this.AffinityGroups.put(lo, this.eg.getEqClasses());
                    this.Lineage.put(lo, this.eg.getLineage());
                }
            }
        }
        catch (Exception e) {
            e.printStackTrace(System.out);
            throw new FrontendException("Exception: " + e.getMessage());
        }
    }

    private boolean checkCompleteness(LogicalRelationalOperator op) throws Exception {
        LineageTracer lineage = this.Lineage.get(op);
        this.Lineage.remove(op);
        Collection<IdentityHashSet<Tuple>> affinityGroups = this.AffinityGroups.get(op);
        this.AffinityGroups.remove(op);
        Map<LOLoad, DataBag> newBaseData = this.PruneBaseDataConstrainedCoverage(this.baseData, lineage, affinityGroups);
        Map<Operator, DataBag> derivedData = this.eg.getData(newBaseData);
        double newCompleteness = MetricEvaluation.getCompleteness(null, derivedData, this.eg.getLoToEqClassMap(), true);
        if (newCompleteness >= this.completeness) {
            this.completeness = newCompleteness;
            this.baseData.putAll(newBaseData);
        } else {
            this.continueTrimming = false;
        }
        return this.continueTrimming;
    }

    Map<LOLoad, DataBag> getBaseData() {
        return this.baseData;
    }
}

