YARN - Giant leap in hadoop. Has provided facility to use “App Master” to control the process flow. So how can we leverage this?? Can we use other framework and code to play AppMaster role ?
and at the same time SPEEEEEEEED............!!
One answer can surely be TEZ
What is Tez?
- It is a distributed execution framework for data-processing applications
- Based on expressing a computation as a dataflow graph
- It will use DAG model to execute the applications
- It is built on top of YARN
What does tez DAG contains??
Tez models data processing as a dataflow graph, with the graph
1. vertices representing application logic
2. edges representing movement of data
3. A rich data flow definition API allows users to intuitively express complex query logic.
What is this DAG??
Tez uses DAG (Directed Acyclic Graph): Means a flow of the data in form of a graph which is not looped in cycles. Essentially it will allow higher level tools (like Hive and Pig) to define their overall processing steps (Directed Acyclical Graph) before the job begins. A DAG is a graph of all the steps needed to complete the job (hive query, Pig job, etc.).
“Tez has DAG API through which we can specify the producers, consumers and flow of data.”
--Example code.
DAG API:
Data movement. Defines routing of data between tasks
- One-To-One: Data from the ith producer task routes to the ith consumer task.
- Broadcast: Data from a producer task routes to all consumer tasks.
- Scatter-Gather: Producer tasks scatter data into shards and consumer tasks gather the shards. The ith shard from all producer tasks routes to the ith consumer task.
Scheduling. Defines when a consumer task is scheduled
- Sequential: Consumer task may be scheduled after a producer task completes.
- Concurrent: Consumer task must be co-scheduled with a producer task.
Data source. Defines the lifetime/reliability of a task output
- Persisted: Output will be available after the task exits. Output may be lost later on.
- Persisted-Reliable: Output is reliably stored and will always be available
- Ephemeral: Output is available only while the producer task is running.
Tez has RUNTIME API through which it receives the user code and runs them as tasks.
(There are no more mappers and reducers here FYI.,)
RunTime API:
“Input → processor → output” is a task (previously Mapper or Reducer)
Input , processor and output can be configurable.
Digging deeper.
The API fits well with query plans produced by higher-level declarative applications like Apache Hive and Apache Pig.
1. An execution environment that can handle traditional map-reduce jobs Tez will consider each task running on Vertex , a mapper or a reducer and the data flow between them as edges. You can run those applications written in Map - Reduce on TEZ and make them run faster.
2. An execution environment that handles DAG-based jobs comprising various built-in and extendable primitives.
3. Cluster-side determination of input pieces. User can specify the inputs and outputs of the process leveraging the RunTime API of the TEZ
4. Runtime planning such as task cardinality determination and dynamic modification to the DAG structure.
Tez Sessions.
Oh man! What are Tez sessions??
Similar to the sessions in any other RDBMS. All the queries launched by a user in a session are considered in a session and are run with the same Application Master.
Hmm!! Why do i need them??
So being able to launch multiple DAGs to the same Application Master, the overhead of launching new AMs for each DAG. is DECREASED!!
Wow!! But only one??
Re use of container and Caching with the session can be leveraged. Before you shoot one more question let me explain what they are and how they work.
Reusing containers:
From the knowledge of YARN , we came to know about these “containers” - lease for the capacity of the node where a JVM for a task is launched and task is run. (have a visit to containers for more info.)
It’s better not to launch containers for every small task is not appreciated .. Why? As mentioned above launching containers, always hitting Resource Manager to allocate the containers for small runtime tasks may decrease your performance.
Tez is for our rescue. We can use containers and use again and again if they are being used in the same DAG (containers should be compatible though). Not only that, Containers can also be reused by the other DAGs if running in the same Tez session.
And how is this reuse scheduled? Task scheduler in ResourceManager will launch only new containers. Thus, Scheduling of the reuse of the containers is done by Tez task scheduler. The Tez scheduler works with several parameters to take decisions on task assignments – task-locality requirements, compatibility of containers as described above, total available resources on the cluster, and the priority of pending task requests.
When a container is available for the reuse, scheduler checks if it has any tasks pending and has compatible container needs and data local for the task and launches the task and if such containers are not available then it will launch the task in rack local and in worst case cluster local.
Caching:
Apart from JVM reuse Each Tez JVM (or container) contains an object cache, which can be used to share data between different tasks running within the same container. This is a simple Key-Object store, with different levels of visibility/retention. Objects can be cached for use within tasks belonging to the same Vertex, for all tasks within a DAG, and for tasks running across a Tez Session (more on Sessions in a subsequent post). The resources being cached may, in the future, be made available as a hint to the Tez Scheduler for affinity based scheduling.
Wonderful!! How do i use them ??
Very simple.
- Firstly, instantiate a TezSession object with the required configuration using TezSessionConfiguration.
- Invoke TezSession::start()
- Wait for the TezSession to reach a ready state to accept DAGs by using theTezSession::getSessionStatus() api (this step is optional)
- Submit a DAG to the Session using TezSession::submitDAG(DAG dag)
- Monitor the DAG’s status using the DAGClient instance obtained in step (4).
- Once the DAG has completed, repeat step (4) and step (5) for subsequent DAGs.
- Shutdown the Session once all work is done via TezSession::stop().
There are some things to keep in mind when using a Tez Session:
- A Tez Session maps to a single Application Master and therefore, all resources required by any user-logic (in any subsequent DAG) running within the ApplicationMaster should be available when the AM is launched.
- This mostly pertains to code related to the VertexOutputCommitter and any user-logic in the Vertex scheduling and management layers.
- User-logic run in tasks is not governed by the above restriction.
- The resources (memory, CPU) of the AM are fixed so please keep this in mind when configuring the AM for use in a session. For example, memory requirements may be higher for a very large DAG
Everything is fine. But who is responsible for all the extra intelligence Tez showcases??
Vertex Manager:
Vertex manager is the daemon which resides in the Tez AppMAster and decides when a task in vertex should start .It also controls the parallelism. It also plays the role in deciding the number of reducers based on the map output dynamically.
DAG Scheduler: The DAGScheduler assigns the task started by the vertex manager priority of execution depending its depth in the graph and other things like whether it’s a retry or not. Then the task goes to the Task Scheduler that actually assigns it to a YARN container.
Task Scheduler: Along with the pluggable scheduler in the YARN, Tez contains a scheduler to allocate the containers while “container reuse”
0 comments:
Post a Comment