Class IRDAG
- java.lang.Object
-
- org.apache.nemo.common.ir.IRDAG
-
- All Implemented Interfaces:
java.io.Serializable
,DAGInterface<IRVertex,IREdge>
@NotThreadSafe public final class IRDAG extends java.lang.Object implements DAGInterface<IRVertex,IREdge>
An IRDAG object captures a high-level data processing application (e.g., Spark/Beam application). - IRVertex: A data-parallel operation. (e.g., map) - IREdge: A data dependency between two operations. (e.g., shuffle)Largely two types of IRDAG optimization(modification) methods are provided. All of these methods preserve application semantics. - Annotation: setProperty(), getPropertyValue() on each IRVertex/IREdge - Reshaping: insert(), delete() on the IRDAG
TODO #341: Rethink IRDAG insert() signatures
- See Also:
- Serialized Form
-
-
Nested Class Summary
-
Nested classes/interfaces inherited from interface org.apache.nemo.common.dag.DAGInterface
DAGInterface.TraversalOrder
-
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description boolean
advanceDAGSnapshot(java.util.function.BiFunction<IRDAG,IRDAG,java.lang.Boolean> checker)
Used internally by Nemo to advance the DAG snapshot after applying each pass.com.fasterxml.jackson.databind.node.ObjectNode
asJsonNode()
IRDAGChecker.CheckerResult
checkIntegrity()
void
delete(IRVertex vertexToDelete)
Deletes a previously inserted utility vertex.void
dfsDo(IRVertex vertex, java.util.function.Consumer<IRVertex> vertexConsumer, DAGInterface.TraversalOrder traversalOrder, java.util.Set<IRVertex> visited)
A recursive helper function forDAGInterface.dfsTraverse(Consumer, TraversalOrder)
.void
dfsTraverse(java.util.function.Consumer<IRVertex> function, DAGInterface.TraversalOrder traversalOrder)
Traverses the DAG by DFS, applying the given function.java.util.List<IRVertex>
filterVertices(java.util.function.Predicate<IRVertex> condition)
Filters the vertices according to the given condition.java.util.List<IRVertex>
getAncestors(java.lang.String vertexId)
Retrieves the ancestors of a vertex.LoopVertex
getAssignedLoopVertexOf(IRVertex irVertex)
Retrieves the wrapping LoopVertex of the vertex.java.util.List<IRVertex>
getChildren(java.lang.String vertexId)
Retrieves the children vertices of the given vertex.java.util.List<IRVertex>
getDescendants(java.lang.String vertexId)
Retrieves the descendants of a vertex.IREdge
getEdgeBetween(java.lang.String srcVertexId, java.lang.String dstVertexId)
Retrieves the edge between two vertices.IREdge
getEdgeById(java.lang.String id)
Retrieves the edge given its ID.java.util.List<IREdge>
getEdges()
Retrieves the edges of this DAG.java.util.List<Pair<java.lang.Integer,ResourceSpecification>>
getExecutorInfo()
Getter for the executor specifications information.java.util.List<IREdge>
getIncomingEdgesOf(java.lang.String vertexId)
Retrieves the incoming edges of the given vertex.java.util.List<IREdge>
getIncomingEdgesOf(IRVertex v)
Retrieves the incoming edges of the given vertex.java.lang.Long
getInputSize()
java.lang.Integer
getLoopStackDepthOf(IRVertex irVertex)
Retrieves the stack depth of the given vertex.java.util.List<IREdge>
getOutgoingEdgesOf(java.lang.String vertexId)
Retrieves the outgoing edges of the given vertex.java.util.List<IREdge>
getOutgoingEdgesOf(IRVertex v)
Retrieves the outgoing edges of the given vertex.java.util.List<IRVertex>
getParents(java.lang.String vertexId)
Retrieves the parent vertices of the given vertex.java.util.List<IRVertex>
getRootVertices()
Retrieves the root vertices of this DAG.java.util.List<IRVertex>
getTopologicalSort()
Gets the DAG's vertices in topologically sorted order.IRVertex
getVertexById(java.lang.String id)
Retrieves the vertex given its ID.java.util.List<IRVertex>
getVertices()
Retrieves the vertices of this DAG.void
insert(java.util.Set<SamplingVertex> toInsert, java.util.Set<IRVertex> executeAfter)
Inserts a set of samplingVertices that process sampled data.void
insert(RelayVertex relayVertex, IREdge edgeToStreamize)
Inserts a new vertex that streams data.void
insert(MessageGeneratorVertex messageGeneratorVertex, MessageAggregatorVertex messageAggregatorVertex, EncoderProperty triggerOutputEncoder, DecoderProperty triggerOutputDecoder, java.util.Set<IREdge> edgesToGetStatisticsOf, java.util.Set<IREdge> edgesToOptimize)
Inserts a new vertex that analyzes intermediate data, and triggers a dynamic optimization.void
insert(SignalVertex toInsert, IREdge edgeToOptimize)
Inserts new vertex which calls for runtime pass.void
insert(TaskSizeSplitterVertex toInsert)
Insert TaskSizeSplitterVertex in dag.java.lang.String
irDAGSummary()
java.lang.Boolean
isCompositeVertex(IRVertex irVertex)
Checks whether the given vertex is assigned with a wrapping LoopVertex.java.lang.Boolean
pathExistsBetween(IRVertex v1, IRVertex v2)
Function checks whether there is a path between two vertices.void
recordExecutorInfo(java.util.List<Pair<java.lang.Integer,ResourceSpecification>> parsedExecutorInfo)
Setter for the executor specifications information.void
reshapeUnsafely(java.util.function.Function<DAG<IRVertex,IREdge>,DAG<IRVertex,IREdge>> unsafeReshapingFunction)
Reshape unsafely, without guarantees on preserving application semantics.void
storeJSON(java.lang.String directory, java.lang.String name, java.lang.String description)
Stores JSON representation of this DAG into a file.void
topologicalDo(java.util.function.Consumer<IRVertex> function)
Applies the function to each node in the DAG in a topological order.java.lang.String
toString()
-
-
-
Method Detail
-
checkIntegrity
public IRDAGChecker.CheckerResult checkIntegrity()
-
advanceDAGSnapshot
public boolean advanceDAGSnapshot(java.util.function.BiFunction<IRDAG,IRDAG,java.lang.Boolean> checker)
Used internally by Nemo to advance the DAG snapshot after applying each pass.- Parameters:
checker
- that compares the dagSnapshot and the modifiedDAG to determine if the snapshot can be set the current modifiedDAG.- Returns:
- true if the checker passes, false otherwise.
-
irDAGSummary
public java.lang.String irDAGSummary()
- Returns:
- a IR DAG summary string, consisting of only the vertices generated from the frontend.
-
getInputSize
public java.lang.Long getInputSize()
- Returns:
- the total sum of the input size for the IR DAG.
-
recordExecutorInfo
public void recordExecutorInfo(java.util.List<Pair<java.lang.Integer,ResourceSpecification>> parsedExecutorInfo)
Setter for the executor specifications information.- Parameters:
parsedExecutorInfo
- executor information parsed for processing.
-
getExecutorInfo
public java.util.List<Pair<java.lang.Integer,ResourceSpecification>> getExecutorInfo()
Getter for the executor specifications information.- Returns:
- the executor specifications information.
-
delete
public void delete(IRVertex vertexToDelete)
Deletes a previously inserted utility vertex. (e.g., TriggerVertex, RelayVertex, SamplingVertex)Notice that the actual number of vertices that will be deleted after this call returns can be more than one. We roll back the changes made with the previous insert(), while preserving application semantics.
- Parameters:
vertexToDelete
- to delete.
-
insert
public void insert(RelayVertex relayVertex, IREdge edgeToStreamize)
Inserts a new vertex that streams data.Before: src - edgeToStreamize - dst After: src - edgeToStreamizeWithNewDestination - relayVertex - oneToOneEdge - dst (replaces the "Before" relationships)
This preserves semantics as the relayVertex simply forwards data elements from the input edge to the output edge.
- Parameters:
relayVertex
- to insert.edgeToStreamize
- to modify.
-
insert
public void insert(MessageGeneratorVertex messageGeneratorVertex, MessageAggregatorVertex messageAggregatorVertex, EncoderProperty triggerOutputEncoder, DecoderProperty triggerOutputDecoder, java.util.Set<IREdge> edgesToGetStatisticsOf, java.util.Set<IREdge> edgesToOptimize)
Inserts a new vertex that analyzes intermediate data, and triggers a dynamic optimization.For each edge in edgesToGetStatisticsOf...
Before: src - edge - dst After: src - oneToOneEdge(a clone of edge) - triggerVertex - shuffleEdge - messageAggregatorVertex - broadcastEdge - dst (the "Before" relationships are unmodified)
This preserves semantics as the results of the inserted message vertices are never consumed by the original IRDAG.
TODO #345: Simplify insert(TriggerVertex)
- Parameters:
messageGeneratorVertex
- to insert.messageAggregatorVertex
- to insert.triggerOutputEncoder
- to use.triggerOutputDecoder
- to use.edgesToGetStatisticsOf
- to examine.edgesToOptimize
- to optimize.
-
insert
public void insert(SignalVertex toInsert, IREdge edgeToOptimize)
Inserts new vertex which calls for runtime pass. e.g) suppose that we want to change vertex 2's property by using runtime pass, but the related data is not gained directly from the incoming edge of vertex 2 (for example, the data is gained from using simulation). In this case, it is unnecessary to insert message generator vertex and message aggregator vertex to launch runtime pass. Original case: (vertex1) -- shuffle edge -- (vertex 2) After inserting signal Vertex: (vertex 1) -------------------- shuffle edge ------------------- (vertex 2) -- control edge -- (signal vertex) -- control edge -- Therefore, the shuffle edge to vertex 2 is executed after signal vertex is executed. Since signal vertex only 'signals' the launch of runtime pass, its parallelism is sufficient to be only 1.- Parameters:
toInsert
- Signal vertex to optimize.edgeToOptimize
- Original edge to optimize(in the above example, shuffle edge).
-
insert
public void insert(java.util.Set<SamplingVertex> toInsert, java.util.Set<IRVertex> executeAfter)
Inserts a set of samplingVertices that process sampled data.This method automatically inserts the following three types of edges. (1) Edges between samplingVertices to reflect the original relationship (2) Edges from the original IRDAG to samplingVertices that clone the inEdges of the original vertices (3) Edges from the samplingVertices to the original IRDAG to respect executeAfterSamplingVertices
Suppose the caller supplies the following arguments to perform a "sampled run" of vertices {V1, V2}, prior to executing them. - samplingVertices: {V1', V2'} - childrenOfSamplingVertices: {V1}
Before: V1 - oneToOneEdge - V2 - shuffleEdge - V3 After: V1' - oneToOneEdge - V2' - controlEdge - V1 - oneToOneEdge - V2 - shuffleEdge - V3
This preserves semantics as the original IRDAG remains unchanged and unaffected.
(Future calls to insert() can add new vertices that connect to sampling vertices. Such new vertices will also be wrapped with sampling vertices, as new vertices that consume outputs from sampling vertices will process a subset of data anyways, and no such new vertex will reach the original DAG except via control edges)
TODO #343: Extend SamplingVertex control edges
- Parameters:
toInsert
- sampling vertices.executeAfter
- that must be executed after toInsert.
-
insert
public void insert(TaskSizeSplitterVertex toInsert)
Insert TaskSizeSplitterVertex in dag.- Parameters:
toInsert
- TaskSizeSplitterVertex to insert.
-
reshapeUnsafely
public void reshapeUnsafely(java.util.function.Function<DAG<IRVertex,IREdge>,DAG<IRVertex,IREdge>> unsafeReshapingFunction)
Reshape unsafely, without guarantees on preserving application semantics. TODO #330: Refactor Unsafe Reshaping Passes- Parameters:
unsafeReshapingFunction
- takes as input the underlying DAG, and outputs a reshaped DAG.
-
topologicalDo
public void topologicalDo(java.util.function.Consumer<IRVertex> function)
Description copied from interface:DAGInterface
Applies the function to each node in the DAG in a topological order. This function brings consistent results.- Specified by:
topologicalDo
in interfaceDAGInterface<IRVertex,IREdge>
- Parameters:
function
- to apply.
-
dfsTraverse
public void dfsTraverse(java.util.function.Consumer<IRVertex> function, DAGInterface.TraversalOrder traversalOrder)
Description copied from interface:DAGInterface
Traverses the DAG by DFS, applying the given function.- Specified by:
dfsTraverse
in interfaceDAGInterface<IRVertex,IREdge>
- Parameters:
function
- to apply.traversalOrder
- which the DFS should be conducted.
-
dfsDo
public void dfsDo(IRVertex vertex, java.util.function.Consumer<IRVertex> vertexConsumer, DAGInterface.TraversalOrder traversalOrder, java.util.Set<IRVertex> visited)
Description copied from interface:DAGInterface
A recursive helper function forDAGInterface.dfsTraverse(Consumer, TraversalOrder)
.- Specified by:
dfsDo
in interfaceDAGInterface<IRVertex,IREdge>
- Parameters:
vertex
- the root vertex of the remaining DAG.vertexConsumer
- the function to apply.traversalOrder
- which the DFS should be conducted.visited
- the set of nodes visited.
-
pathExistsBetween
public java.lang.Boolean pathExistsBetween(IRVertex v1, IRVertex v2)
Description copied from interface:DAGInterface
Function checks whether there is a path between two vertices.- Specified by:
pathExistsBetween
in interfaceDAGInterface<IRVertex,IREdge>
- Parameters:
v1
- First vertex to check.v2
- Second vertex to check.- Returns:
- Whether or not there is a path between two vertices.
-
isCompositeVertex
public java.lang.Boolean isCompositeVertex(IRVertex irVertex)
Description copied from interface:DAGInterface
Checks whether the given vertex is assigned with a wrapping LoopVertex.- Specified by:
isCompositeVertex
in interfaceDAGInterface<IRVertex,IREdge>
- Parameters:
irVertex
- Vertex to check.- Returns:
- whether or not it is wrapped by a LoopVertex
-
getLoopStackDepthOf
public java.lang.Integer getLoopStackDepthOf(IRVertex irVertex)
Description copied from interface:DAGInterface
Retrieves the stack depth of the given vertex.- Specified by:
getLoopStackDepthOf
in interfaceDAGInterface<IRVertex,IREdge>
- Parameters:
irVertex
- Vertex to check.- Returns:
- The depth of the stack of LoopVertices for the vertex.
-
getAssignedLoopVertexOf
public LoopVertex getAssignedLoopVertexOf(IRVertex irVertex)
Description copied from interface:DAGInterface
Retrieves the wrapping LoopVertex of the vertex.- Specified by:
getAssignedLoopVertexOf
in interfaceDAGInterface<IRVertex,IREdge>
- Parameters:
irVertex
- Vertex to check.- Returns:
- The wrapping LoopVertex.
-
asJsonNode
public com.fasterxml.jackson.databind.node.ObjectNode asJsonNode()
- Specified by:
asJsonNode
in interfaceDAGInterface<IRVertex,IREdge>
- Returns:
JsonNode
for this DAG.
-
storeJSON
public void storeJSON(java.lang.String directory, java.lang.String name, java.lang.String description)
Description copied from interface:DAGInterface
Stores JSON representation of this DAG into a file.- Specified by:
storeJSON
in interfaceDAGInterface<IRVertex,IREdge>
- Parameters:
directory
- the directory which JSON representation is saved toname
- name of this DAGdescription
- description of this DAG
-
getVertexById
public IRVertex getVertexById(java.lang.String id)
Description copied from interface:DAGInterface
Retrieves the vertex given its ID.- Specified by:
getVertexById
in interfaceDAGInterface<IRVertex,IREdge>
- Parameters:
id
- of the vertex to retrieve.- Returns:
- the vertex.
-
getEdgeById
public IREdge getEdgeById(java.lang.String id)
Description copied from interface:DAGInterface
Retrieves the edge given its ID.- Specified by:
getEdgeById
in interfaceDAGInterface<IRVertex,IREdge>
- Parameters:
id
- of the edge to retrieve.- Returns:
- the edge.
-
getVertices
public java.util.List<IRVertex> getVertices()
Description copied from interface:DAGInterface
Retrieves the vertices of this DAG.- Specified by:
getVertices
in interfaceDAGInterface<IRVertex,IREdge>
- Returns:
- the list of vertices.
Note that the result is never null, ensured by
DAGBuilder
.
-
getEdges
public java.util.List<IREdge> getEdges()
Description copied from interface:DAGInterface
Retrieves the edges of this DAG.- Specified by:
getEdges
in interfaceDAGInterface<IRVertex,IREdge>
- Returns:
- the list of edges.
-
getRootVertices
public java.util.List<IRVertex> getRootVertices()
Description copied from interface:DAGInterface
Retrieves the root vertices of this DAG.- Specified by:
getRootVertices
in interfaceDAGInterface<IRVertex,IREdge>
- Returns:
- the list of root vertices.
-
getIncomingEdgesOf
public java.util.List<IREdge> getIncomingEdgesOf(IRVertex v)
Description copied from interface:DAGInterface
Retrieves the incoming edges of the given vertex.- Specified by:
getIncomingEdgesOf
in interfaceDAGInterface<IRVertex,IREdge>
- Parameters:
v
- the subject vertex.- Returns:
- the list of incoming edges to the vertex.
Note that the result is never null, ensured by
DAGBuilder
.
-
getIncomingEdgesOf
public java.util.List<IREdge> getIncomingEdgesOf(java.lang.String vertexId)
Description copied from interface:DAGInterface
Retrieves the incoming edges of the given vertex.- Specified by:
getIncomingEdgesOf
in interfaceDAGInterface<IRVertex,IREdge>
- Parameters:
vertexId
- the ID of the subject vertex.- Returns:
- the list of incoming edges to the vertex.
Note that the result is never null, ensured by
DAGBuilder
.
-
getOutgoingEdgesOf
public java.util.List<IREdge> getOutgoingEdgesOf(IRVertex v)
Description copied from interface:DAGInterface
Retrieves the outgoing edges of the given vertex.- Specified by:
getOutgoingEdgesOf
in interfaceDAGInterface<IRVertex,IREdge>
- Parameters:
v
- the subject vertex.- Returns:
- the list of outgoing edges to the vertex.
Note that the result is never null, ensured by
DAGBuilder
.
-
getOutgoingEdgesOf
public java.util.List<IREdge> getOutgoingEdgesOf(java.lang.String vertexId)
Description copied from interface:DAGInterface
Retrieves the outgoing edges of the given vertex.- Specified by:
getOutgoingEdgesOf
in interfaceDAGInterface<IRVertex,IREdge>
- Parameters:
vertexId
- the ID of the subject vertex.- Returns:
- the list of outgoing edges to the vertex.
Note that the result is never null, ensured by
DAGBuilder
.
-
getParents
public java.util.List<IRVertex> getParents(java.lang.String vertexId)
Description copied from interface:DAGInterface
Retrieves the parent vertices of the given vertex.- Specified by:
getParents
in interfaceDAGInterface<IRVertex,IREdge>
- Parameters:
vertexId
- the ID of the subject vertex.- Returns:
- the list of parent vertices.
-
getChildren
public java.util.List<IRVertex> getChildren(java.lang.String vertexId)
Description copied from interface:DAGInterface
Retrieves the children vertices of the given vertex.- Specified by:
getChildren
in interfaceDAGInterface<IRVertex,IREdge>
- Parameters:
vertexId
- the ID of the subject vertex.- Returns:
- the list of children vertices.
-
getEdgeBetween
public IREdge getEdgeBetween(java.lang.String srcVertexId, java.lang.String dstVertexId)
Description copied from interface:DAGInterface
Retrieves the edge between two vertices.- Specified by:
getEdgeBetween
in interfaceDAGInterface<IRVertex,IREdge>
- Parameters:
srcVertexId
- the ID of the source vertex.dstVertexId
- the ID of the destination vertex.- Returns:
- the edge if exists.
-
getTopologicalSort
public java.util.List<IRVertex> getTopologicalSort()
Description copied from interface:DAGInterface
Gets the DAG's vertices in topologically sorted order. This function brings consistent results.- Specified by:
getTopologicalSort
in interfaceDAGInterface<IRVertex,IREdge>
- Returns:
- the sorted list of vertices in topological order.
-
getAncestors
public java.util.List<IRVertex> getAncestors(java.lang.String vertexId)
Description copied from interface:DAGInterface
Retrieves the ancestors of a vertex.- Specified by:
getAncestors
in interfaceDAGInterface<IRVertex,IREdge>
- Parameters:
vertexId
- to find the ancestors for.- Returns:
- the list of ancestors.
-
getDescendants
public java.util.List<IRVertex> getDescendants(java.lang.String vertexId)
Description copied from interface:DAGInterface
Retrieves the descendants of a vertex.- Specified by:
getDescendants
in interfaceDAGInterface<IRVertex,IREdge>
- Parameters:
vertexId
- to find the descendants for.- Returns:
- the list of descendants.
-
filterVertices
public java.util.List<IRVertex> filterVertices(java.util.function.Predicate<IRVertex> condition)
Description copied from interface:DAGInterface
Filters the vertices according to the given condition.- Specified by:
filterVertices
in interfaceDAGInterface<IRVertex,IREdge>
- Parameters:
condition
- that must be satisfied to be included in the filtered list.- Returns:
- the list of vertices that meet the condition.
-
toString
public java.lang.String toString()
- Overrides:
toString
in classjava.lang.Object
-
-