Class IRDAG

  • All Implemented Interfaces:
    Serializable, DAGInterface<IRVertex,​IREdge>

    @NotThreadSafe
    public final class IRDAG
    extends Object
    implements DAGInterface<IRVertex,​IREdge>
    An IRDAG object captures a high-level data processing application (e.g., Spark/Beam application). - IRVertex: A data-parallel operation. (e.g., map) - IREdge: A data dependency between two operations. (e.g., shuffle)

    Largely two types of IRDAG optimization(modification) methods are provided. All of these methods preserve application semantics. - Annotation: setProperty(), getPropertyValue() on each IRVertex/IREdge - Reshaping: insert(), delete() on the IRDAG

    TODO #341: Rethink IRDAG insert() signatures

    See Also:
    Serialized Form
    • Constructor Detail

      • IRDAG

        public IRDAG​(DAG<IRVertex,​IREdge> originalUserApplicationDAG)
        Parameters:
        originalUserApplicationDAG - the initial DAG.
    • Method Detail

      • advanceDAGSnapshot

        public boolean advanceDAGSnapshot​(BiFunction<IRDAG,​IRDAG,​Boolean> checker)
        Used internally by Nemo to advance the DAG snapshot after applying each pass.
        Parameters:
        checker - that compares the dagSnapshot and the modifiedDAG to determine if the snapshot can be set the current modifiedDAG.
        Returns:
        true if the checker passes, false otherwise.
      • irDAGSummary

        public String irDAGSummary()
        Returns:
        a IR DAG summary string, consisting of only the vertices generated from the frontend.
      • delete

        public void delete​(IRVertex vertexToDelete)
        Deletes a previously inserted utility vertex. (e.g., TriggerVertex, RelayVertex, SamplingVertex)

        Notice that the actual number of vertices that will be deleted after this call returns can be more than one. We roll back the changes made with the previous insert(), while preserving application semantics.

        Parameters:
        vertexToDelete - to delete.
      • insert

        public void insert​(RelayVertex relayVertex,
                           IREdge edgeToStreamize)
        Inserts a new vertex that streams data.

        Before: src - edgeToStreamize - dst After: src - edgeToStreamizeWithNewDestination - relayVertex - oneToOneEdge - dst (replaces the "Before" relationships)

        This preserves semantics as the relayVertex simply forwards data elements from the input edge to the output edge.

        Parameters:
        relayVertex - to insert.
        edgeToStreamize - to modify.
      • insert

        public void insert​(TriggerVertex triggerVertex,
                           MessageAggregatorVertex messageAggregatorVertex,
                           EncoderProperty triggerOutputEncoder,
                           DecoderProperty triggerOutputDecoder,
                           Set<IREdge> edgesToGetStatisticsOf,
                           Set<IREdge> edgesToOptimize)
        Inserts a new vertex that analyzes intermediate data, and triggers a dynamic optimization.

        For each edge in edgesToGetStatisticsOf...

        Before: src - edge - dst After: src - oneToOneEdge(a clone of edge) - triggerVertex - shuffleEdge - messageAggregatorVertex - broadcastEdge - dst (the "Before" relationships are unmodified)

        This preserves semantics as the results of the inserted message vertices are never consumed by the original IRDAG.

        TODO #345: Simplify insert(TriggerVertex)

        Parameters:
        triggerVertex - to insert.
        messageAggregatorVertex - to insert.
        triggerOutputEncoder - to use.
        triggerOutputDecoder - to use.
        edgesToGetStatisticsOf - to examine.
        edgesToOptimize - to optimize.
      • insert

        public void insert​(Set<SamplingVertex> toInsert,
                           Set<IRVertex> executeAfter)
        Inserts a set of samplingVertices that process sampled data.

        This method automatically inserts the following three types of edges. (1) Edges between samplingVertices to reflect the original relationship (2) Edges from the original IRDAG to samplingVertices that clone the inEdges of the original vertices (3) Edges from the samplingVertices to the original IRDAG to respect executeAfterSamplingVertices

        Suppose the caller supplies the following arguments to perform a "sampled run" of vertices {V1, V2}, prior to executing them. - samplingVertices: {V1', V2'} - childrenOfSamplingVertices: {V1}

        Before: V1 - oneToOneEdge - V2 - shuffleEdge - V3 After: V1' - oneToOneEdge - V2' - controlEdge - V1 - oneToOneEdge - V2 - shuffleEdge - V3

        This preserves semantics as the original IRDAG remains unchanged and unaffected.

        (Future calls to insert() can add new vertices that connect to sampling vertices. Such new vertices will also be wrapped with sampling vertices, as new vertices that consume outputs from sampling vertices will process a subset of data anyways, and no such new vertex will reach the original DAG except via control edges)

        TODO #343: Extend SamplingVertex control edges

        Parameters:
        toInsert - sampling vertices.
        executeAfter - that must be executed after toInsert.
      • reshapeUnsafely

        public void reshapeUnsafely​(Function<DAG<IRVertex,​IREdge>,​DAG<IRVertex,​IREdge>> unsafeReshapingFunction)
        Reshape unsafely, without guarantees on preserving application semantics. TODO #330: Refactor Unsafe Reshaping Passes
        Parameters:
        unsafeReshapingFunction - takes as input the underlying DAG, and outputs a reshaped DAG.
      • topologicalDo

        public void topologicalDo​(Consumer<IRVertex> function)
        Description copied from interface: DAGInterface
        Applies the function to each node in the DAG in a topological order. This function brings consistent results.
        Specified by:
        topologicalDo in interface DAGInterface<IRVertex,​IREdge>
        Parameters:
        function - to apply.
      • pathExistsBetween

        public Boolean pathExistsBetween​(IRVertex v1,
                                         IRVertex v2)
        Description copied from interface: DAGInterface
        Function checks whether there is a path between two vertices.
        Specified by:
        pathExistsBetween in interface DAGInterface<IRVertex,​IREdge>
        Parameters:
        v1 - First vertex to check.
        v2 - Second vertex to check.
        Returns:
        Whether or not there is a path between two vertices.
      • isCompositeVertex

        public Boolean isCompositeVertex​(IRVertex irVertex)
        Description copied from interface: DAGInterface
        Checks whether the given vertex is assigned with a wrapping LoopVertex.
        Specified by:
        isCompositeVertex in interface DAGInterface<IRVertex,​IREdge>
        Parameters:
        irVertex - Vertex to check.
        Returns:
        whether or not it is wrapped by a LoopVertex
      • getLoopStackDepthOf

        public Integer getLoopStackDepthOf​(IRVertex irVertex)
        Description copied from interface: DAGInterface
        Retrieves the stack depth of the given vertex.
        Specified by:
        getLoopStackDepthOf in interface DAGInterface<IRVertex,​IREdge>
        Parameters:
        irVertex - Vertex to check.
        Returns:
        The depth of the stack of LoopVertices for the vertex.
      • asJsonNode

        public com.fasterxml.jackson.databind.node.ObjectNode asJsonNode()
        Specified by:
        asJsonNode in interface DAGInterface<IRVertex,​IREdge>
        Returns:
        JsonNode for this DAG.
      • storeJSON

        public void storeJSON​(String directory,
                              String name,
                              String description)
        Description copied from interface: DAGInterface
        Stores JSON representation of this DAG into a file.
        Specified by:
        storeJSON in interface DAGInterface<IRVertex,​IREdge>
        Parameters:
        directory - the directory which JSON representation is saved to
        name - name of this DAG
        description - description of this DAG
      • getIncomingEdgesOf

        public List<IREdge> getIncomingEdgesOf​(String vertexId)
        Description copied from interface: DAGInterface
        Retrieves the incoming edges of the given vertex.
        Specified by:
        getIncomingEdgesOf in interface DAGInterface<IRVertex,​IREdge>
        Parameters:
        vertexId - the ID of the subject vertex.
        Returns:
        the list of incoming edges to the vertex. Note that the result is never null, ensured by DAGBuilder.
      • getOutgoingEdgesOf

        public List<IREdge> getOutgoingEdgesOf​(String vertexId)
        Description copied from interface: DAGInterface
        Retrieves the outgoing edges of the given vertex.
        Specified by:
        getOutgoingEdgesOf in interface DAGInterface<IRVertex,​IREdge>
        Parameters:
        vertexId - the ID of the subject vertex.
        Returns:
        the list of outgoing edges to the vertex. Note that the result is never null, ensured by DAGBuilder.
      • getParents

        public List<IRVertex> getParents​(String vertexId)
        Description copied from interface: DAGInterface
        Retrieves the parent vertices of the given vertex.
        Specified by:
        getParents in interface DAGInterface<IRVertex,​IREdge>
        Parameters:
        vertexId - the ID of the subject vertex.
        Returns:
        the list of parent vertices.
      • getChildren

        public List<IRVertex> getChildren​(String vertexId)
        Description copied from interface: DAGInterface
        Retrieves the children vertices of the given vertex.
        Specified by:
        getChildren in interface DAGInterface<IRVertex,​IREdge>
        Parameters:
        vertexId - the ID of the subject vertex.
        Returns:
        the list of children vertices.
      • getEdgeBetween

        public IREdge getEdgeBetween​(String srcVertexId,
                                     String dstVertexId)
        Description copied from interface: DAGInterface
        Retrieves the edge between two vertices.
        Specified by:
        getEdgeBetween in interface DAGInterface<IRVertex,​IREdge>
        Parameters:
        srcVertexId - the ID of the source vertex.
        dstVertexId - the ID of the destination vertex.
        Returns:
        the edge if exists.
      • getTopologicalSort

        public List<IRVertex> getTopologicalSort()
        Description copied from interface: DAGInterface
        Gets the DAG's vertices in topologically sorted order. This function brings consistent results.
        Specified by:
        getTopologicalSort in interface DAGInterface<IRVertex,​IREdge>
        Returns:
        the sorted list of vertices in topological order.
      • filterVertices

        public List<IRVertex> filterVertices​(Predicate<IRVertex> condition)
        Description copied from interface: DAGInterface
        Filters the vertices according to the given condition.
        Specified by:
        filterVertices in interface DAGInterface<IRVertex,​IREdge>
        Parameters:
        condition - that must be satisfied to be included in the filtered list.
        Returns:
        the list of vertices that meet the condition.