216x Filetype PDF File size 1.54 MB Source: richard-ma.netlify.app
Trisk: Task-Centric Data Stream Reconfiguration
Yancan Mao YuanHuang Runxin Tian
National University of Singapore National University of Singapore National University of Singapore
maoyancan@u.nus.edu dcsyhg@nus.edu.sg tianrunxin@u.nus.edu
Xin Wang Richard T. B. Ma
National University of Singapore National University of Singapore
dcswan@nus.edu.sg tbma@comp.nus.edu.sg
ABSTRACT the form of stream processing, where the arrived data
Due to the long-run and unpredictable nature of stream is processed immediately with low latency and high
processing,anystaticallyconfiguredexecutionofstreamjobs throughput. Today, many distributed stream systems, e.g.,
fails to process data in a timely and efficient manner. To Samza [32], Flink [9], Heron [24], Storm [41] and Spark
achieve performance requirements, stream jobs need to be Streaming [44], have been developed to parallelize, deploy
reconfigured dynamically. In this paper, we present Trisk, a andmanagestreamjobsforusers.
control plane that support versatile reconfigurations while As data stream is by nature fluctuating with dynamic
keeping high efficiency with easy-to-use programming APIs. rates and distribution over time, to satisfy low latency
Trisk enables versatile reconfigurations with usability based requirements, stream jobs must process data timely [6, 8].
on a task-centric abstraction, and encapsulates primitive This requires stream systems to be able to reconfigure part
operations such that reconfigurations can be described by of the dataflow computation dynamically during execution
compositing the primitive operations on the abstraction. without affecting the correctness of processing logic. We
Triskadoptsapartialpause-and-resumedesignforefficiency, define such actions as reconfigurations on stream jobs. In
through which synchronization mechanisms in the native practice, reconfigurations are often applied by a control
streamsystemscanfurtherbeleveraged.WeimplementTrisk policy to achieve certain performance goals. Based on
onApacheFlinkanddemonstrateitsusageandperformance prior literature [10], we summarize that a good stream
under realistic application scenarios. We show that Trisk system should enable reconfigurations with three desirable
executes reconfigurations with shorter completion time and properties: versatility, efficiency, and usability.
comparable latency compared to a state-of-the-art fluid Versatility. A stream system should support a wide
mechanismforstate management. variety of reconfigurations, such that various control polices
ACMReferenceFormat: that require different types of reconfigurations can be
Yancan Mao, Yuan Huang, Runxin Tian, Xin Wang, and Richard implemented. Common reconfigurations mainly include
T. B. Ma. 2021. Trisk: Task-Centric Data Stream Reconfiguration. operations along three dimensions, i.e., resources, workloads,
In ACM Symposium on Cloud Computing (SoCC ’21), November and execution logic. The resources and workloads often need
1ś4, 2021, Seattle, WA, USA. ACM, New York, NY, USA, 15 pages. to be re-assigned to handle data skewness and changes of
https://doi.org/10.1145/3472883.3487010 input rates, while the execution logic needs to be updated to
fix bugs and handle emerging events [7, 9].
1 INTRODUCTION Efficiency. Reconfigurations should be executed and
With the development of Internet-scale services, data is completed in short time, having minimum impact on the
generated in high volume, velocity and variety. Applications original stream job execution. Stream jobs are physically
with time constraints are increasingly implemented in executed by a set of parallel tasks, to guarantee the
correctness of job execution during reconfigurations,
Permission to make digital or hard copies of part or all of this work for synchronization is required among those parallel tasks,
personal or classroom use is granted without fee provided that copies are whichblocks the system temporarily. Thus, it is important
not made or distributed for profit or commercial advantage and that copies to execute reconfigurations efficiently to minimize the
bear this notice and the full citation on the first page. Copyrights for third- unavoidable unavailablity time during reconfigurations.
party components of this work must be honored. For all other uses, contact Usability. A stream system should also provide intuitive
the owner/author(s). and easy-to-use APIs for users to implement their control
SoCC’21, November 1ś4, 2021, Seattle, WA, USA policies, ideally without assuming that users understand the
©2021Copyrightheldbytheowner/author(s). details of reconfiguration execution.
ACMISBN978-1-4503-8638-8/21/11.
https://doi.org/10.1145/3472883.3487010
SoCC’21,November1ś4,2021,Seattle, WA, USA Yancan Mao, Yuan Huang, Runxin Tian, Xin Wang, and Richard T. B. Ma
Although existing works provide some of the desirable 2 BACKGROUNDANDMOTIVATION
properties, they are unable to achieve all. Due to the use In this section, we first introduce the terminologies used in
of a kill-and-restart method to execute reconfigurations, this paper, and then motivate the necessity of supporting
Flink[9],Samza[32],andHeron[24]enablereconfigurations reconfigurations with the three proposed properties.
at a high cost of efficiency. Research prototypes such as Adistributed stream job runs as a physical deployment
Megaphone [19] and Rhino [29] proposed efficient state of an execution plan which instantiates operators to physical
management primitives with high usability, but lack of parallel tasks. An execution plan describes the configurations
the support for other types of reconfigurations such as of a stream job, and can be represented as a directed graph,
change of logic to update execution logic. Chi [28] used whereverticesinthegraphrepresenttasks instantiatedfrom
a control message based programming model to support operators, and edges represent the data flow between tasks.
various control logic, but was not mainly designed for Specifically, operators maintain the user-defined execution
reconfigurations. As specific system-level operations need to logic to process the input data, and tasks that are instantiated
bespecifiedtoimplementareconfiguration,Chiwastargeted from the same operator share the same execution logic.
for advanced users that manage system internals. The input data of an operator forms the workloads to be
1
In this paper, we present Trisk , a control plane solution processed by tasks in parallel. The workloads of an operator
that supports reconfigurations of stream jobs with all three are commonly grouped by keys and partitioned across tasks.
properties. The core of Trisk is a task-centric abstraction Each task is allocated with certain resources such as CPU
that describes the execution plan of the target stream coresandmemoryonanodeinclusterforphysicalexecution.
job. The execution plan of a stream job maintains the Toachieve performance requirements, users often apply
configurations of its physical tasks and is used to deploy control policy on stream jobs. A control policy involves
the job on a cluster. Since any reconfiguration boils down two steps. First, it monitors the stream job and decides
to change the existing execution plan to a new one, it can whether or not to update the current execution plan based
be formally described by the operations applied on the onthesymptomsdetected, e.g., backpressure in the pipeline.
current execution plan. To provide usability, we classify Second, the control policy needs to identify the performance
the operations into three types of primitive operations, bottleneck in stream jobs and invoke reconfigurations to
so that various reconfigurations can be implemented by optimize it accordingly. Different control policies make
applying a combination of primitive operations on the Trisk decisions based on different kinds of metrics [14, 15, 22]
abstraction. To execute reconfiguration efficiently with low in both system level, e.g. CPU utilization, and application
system overhead, we adopt a partial pause-and-resume level, e.g. observed arrival rate and backpressure. In this
mechanismbyleveragingsynchronization mechanisms in work, we focus on the execution of reconfigurations given
the native stream, where only part of the stream job will be the decisions of control policies, while metrics retrieval
paused and updated. We implement Trisk on top of Apache mechanismsareregarded as a part of the control logic.
Flink by leveraging the checkpoint mechanism to achieve Reconfigurationsneedtodynamicallychangethephysical
synchronization, and show that Trisk achieves sub-second execution plan of a stream job, which boils down to
completion time to execute reconfigurations. In summary, reconfigure its resources, workloads, and execution logic.
wemakethefollowingcontributions: Such a variety of reconfigurations are required by control
• Weproposeacontrolplanesolution,Trisk,thatmaintainsa policies to achieve different performance goals. For example,
task-centric abstraction with three-dimensional primitive to achieve a SLO/SLA objective for general stream jobs,
operations to implement versatile reconfigurations with prior works such as Henge [23], Dhalion [14], DS2 [22],
high usability. and DRS [15] introduce control policies based on scaling
• Wedesignandimplementaprepare-sync-resumepipeline to reallocate resources for stream jobs. To achieve balanced
to execute reconfigurations by leveraging synchronization load and better resource utilization, prior works such as
mechanismsinthenative stream. DKG[40]proposecontrolpolicies to detect data skewness
• Weintegrate Trisk with Flink and leverage the checkpoint andapplyloadbalancing tomanagetheworkloadsofstream
mechanisminFlinktoexecute reconfigurations. jobs. Furthermore, for machine learning based stream jobs
• We evaluate Trisk via comprehensive experiments suchasonlineanomalydetection[18],becausenewscenarios
using both real-world applications and synthetic micro- and input data are emerging over time, the model with
benchmark.WealsocompareTriskwithnativeFlinkonthe current parameters may fail to process them accurately and
performance of supporting control policies and executing effectively. To solve this problem, the model needs to be
reconfigurations. updated appropriately, where change of logic can be applied
to achieve dynamic model tuning.
1Thesource codes are available at: https://github.com/sane-lab/Trisk
Trisk: Task-Centric Data Stream Reconfiguration SoCC’21,November1ś4,2021,Seattle, WA, USA
Table 1: Overview of existing work enables reconfigurations in stream systems.
Methodology Versatility Usability Efficiency
Flink [9] Dataflow model + Redeploy Medium High Low
Heron[24] Dataflow model + Redeploy Medium High Low
Seep [12] State management primitives + Partial redeploy Low High Medium
Rhino [29] State management primitives + Partial update Low High High
Megaphone[19] Statemanagementprimitives+Non-stoppartial update Medium High High
Chi [28] Message-based programming model + Partial update High Medium High
Trisk Three-dimensional task-centric abstraction + Partial update High High High
Although reconfiguration is best supported with three Targeted for achieving all three desirable properties for
properties [10]: versatility, efficiency, and usability, existing stream reconfigurations, Trisk is designed as a control
systems and research fall short in achieving all of them. We plane solution applicable to general stream systems, and
summarized existing works that support reconfigurations encapsulates mechanisms for general control policies. To
for stream jobs in Table 1, and classify them into three types achieve versatility, Trisk uses a task-centric abstraction,
of implementations. which describes the configurations of each task in three
Built-inreconfigurationleveragestheoriginaldataflow dimensions i.e., resources, workloads and execution logic.
model and programming interfaces provided by stream The Trisk abstraction is designed around tasks, as the
systems to enable reconfigurations. For example, Flink [9] states of tasks describe configurations at the minimum
andHeron[24]redeploythestreamjobwithupdatedcontext granularity, i.e., reconfigurations can be achieved by
for all tasks, i.e. restarting the job with modified source code updating a subset of tasks. For example, load balancing
and configuration files. Although reconfigurations can be redistributes workloads among tasks, scaling cancels or
easily invoked through the original programming interfaces deploys tasks, placement redeploys tasks on other nodes,
provided by the stream systems, they are executed in low and change of logic updates the execution logic of tasks.
efficiency and incur high system overhead and performance Based on the abstraction, Trisk implements three primitive
degradation due to the nature of kill-and-restart. operations (Section 3.1) on updating tasks along the three
Reconfiguration for state management has been dimensions and encapsulates them as a set of APIs. For
designed in prior works such as SEEP [12], Rhino [29] and usability, Trisk provides common reconfigurations (Section
Megaphone[19]. These works proposed state management 3.3) for users to implement control policies easily; while any
primitives that provide interfaces to manage the state of generalreconfigurationscanbeimplementedbycompositing
stream jobs efficiently. Stateful stream jobs maintain state primitive operations (Section 4.2). Trisk uses a prepare-
to process each of the assigned keys, which is regarded as sync-update execution pipeline to execute reconfigurations
a workload-related configuration in our context. With the efficiently (Section 3.2), under which tasks are partially
provided interfaces, reconfigurations that cover workloads paused and updated asynchronously. This enables Trisk to
redistribution for stateful jobs can be implemented with leverage the synchronization mechanisms in native stream
high usability and efficiency. However, such primitives are systems with low system overhead.
limited to state management and do not support other types
of reconfigurations such as placement and change of logic. 3 DESIGN
Reconfiguration via a control plane encapsulates Wefocus on the problem of reconfiguring stream jobs on-
mechanismsforapplyingvarious control logics on stream the-fly, and our goal is to design a control plane that enables
jobs, which supports a variety of reconfigurations. Chi [28] versatile reconfigurations while maintaining usability and
proposes a programming model based on control message efficiency. In this section, we first introduce the design of the
injection, through which new reconfigurations can be Trisk abstraction, and then describe the mechanisms that
implemented by applying fine-grained instructions on each enable asynchronous execution of the reconfiguration. Last,
task and embed them into control messages. Tasks are wepresent the reconfiguration APIs and show how users
updated asynchronously upon receiving the instructions in can implement control policies by using them.
the control messages. However, since the task update logic is
defined by users, they need to be familiar with the execution 3.1 TheTriskAbstraction
details of the stream system and implement instructions TheTrisk abstraction maintains an abstract execution plan
accordingly, which requires non-trivial engineering efforts. that is independent of stream systems for extensibility. This
SoCC’21,November1ś4,2021,Seattle, WA, USA Yancan Mao, Yuan Huang, Runxin Tian, Xin Wang, and Richard T. B. Ma
Input Keys anupstreamandadownstreamtasksgothroughnetworks
Logical topology if both tasks are deployed in different physical machines.
Parallelizing
Intermediate
Stream
Parallelized tasks
UDF
Physical
Machine
Key States
Key Mappings
Resource slot
Deploying Figure2:ConfigurationsoftasksinTriskabstraction.
Physical execution
Message
Queue
Figure 1: Deployment steps of jobs in stream systems Figure 2 illustrates the four configurations associated with
each task specified in the Trisk abstraction, i.e., Key State,
is achieved by specifying the execution plan in terms of the User-Defined Function (UDF), Key Mapping, and Resource Slot.
configurations with respect to individual tasks, which can be • Along the execution logic dimension, User-Defined
classifiedalongthreedimensions:executionlogic,workloads, Function (UDF) defines the processing logic on each input
andresources. In other words, any reconfiguration consists tuplethatitreceived.Afterprocessing,theresultsfromthe
of mainly three types of operations. The intuition behind UDFformtheoutputstreams.Forstateful tasks, UDF has
the three-dimensional Trisk abstraction is derived from the accesstoitsprocessingstate,whichisgeneratedaccording
three-stepdeploymentofstreamjobsshowninFigure1.This to the processing history of arrived data, and will update
streamjobhastwooperators(í µí±‚1,í µí±‚2),whichareinstantiated the state after new tuples being processed.
as three tasks (í µí±‡ ,í µí±‡ ,í µí±‡ ) physically deployed across two
1 2 3 • Along the workloads dimension, the distribution of
machines. The keyspace of the data stream contains four workloads among the tasks of an operator is described
unique keys and is partitioned into two substreams. bytheKeyState distributed across the tasks and the Key
In the first step, a stream job is defined by its logical Mapping in the upstream tasks. Key State represents the
topology described as a DAG, where vertices represent assigned subset of input keys to be processed and the
operators and edges represent the intermediate data streams. associated processing state to be maintained. Key Mapping
Atthisstage,theexecutionlogicisconfiguredandassociated defines how a task maps the keys of output results to
with each operator, implying that all instances of parallel downstream tasks. The Key Mapping in the upstream
tasks of the operator will use the same execution logic to tasks also represents the global Key State assignment of
process the assigned input streams so as to generate outputs. downstreamtasks, i.e., the combination of Key State of all
In the second step, the stream job specifies the number downstreamtasks.
of parallel tasks to be instantiated for each operator. Input • Along the resources dimension, Resource Slot denotes the
data is often defined over a key space, and each task will be amountofresourcesallocatedtoatask,e.g.,CPUcoresand
assigned with a partition of an non-overlapping subset of memoryobtainedfromtheresourcemanagementsystem;
keys for independent data processing. The configuration it also describes the location of task to be deployed, which
is maintained by both upstream and downstream tasks. is important for communication efficiency and avoiding
In particular, the upstream tasks maintain the routing resource contention.
information, which maps their processing results to Ourtask-centric abstraction is general for providing the
downstream tasks. Each downstream task keeps a subset versatility of reconfigurations, because any reconfiguration
of input keys representing the subset of substreams to be boils downtoupdatingthethreetypesoftaskconfigurations,
processed and the corresponding states to be managed. originally executed by the initial deployment of stream jobs.
At this stage, the configuration of workloads needs to be Besides the chosen configurations, the Trisk abstraction can
specified for the individual tasks. be easily extended, since all configurations are generated
In the final step, the stream job deploys tasks on during the initial deployment. For example, the batch size in
physical machines. In particular, each task is assigned to mini-batchprocessingcanbeclassifiedasatypeofexecution
a resource slot configured with resources that determine its logic configuration to define how input tuples are batched.
performance. For example, computational resource such as Based on the dimensions of execution logic, workloads and
CPUcoresaffects the processing rate and memory resource resources, Trisk implements common reconfigurations of
is used to store on-going processing states and affects the change of logic, load balancing and placement, respectively.
speed of I/O operations. Furthermore, data streams between Furthermore, by using operations along the dimensions of
workloads and resources, Trisk also implements scaling.
no reviews yet
Please Login to review.