Plumb is a system for processing streams of large files. A Plumb workflow consists of a sequence of Jobs, each of which processes files from its input queue and produces output that can be queued for processing by one or more other Jobs.
A Plumb Job consumes one or more inputs and creates one or more outputs.
By default, with one input and output, data come on stdin and out on stdout. (These come from HDFS, but your program doesn’t need to care.)
A Plumb job is created for each input that appears on a queue of a given type. Plumb queues will be automatically created if they don’t exist.
A plumb job is defined with YAML that specifies the job, the input queue and the output queue.
For example:
- program: sz_to_xz
input: pcap.sz
output: pcap.xz
with the program (sz_to_xz):
#!/bin/sh
# sz_to_xz
szcat | xz -c
Will recompress everything in the pcap.sz queue and put them to the pcap.xz queue.
The program line can take arguments but does not support arbitrary shell expressions like pipelines. (Which is why sz_to_xz has to be a separate script.) If the program name contains spaces or single quote, make sure to enclose the whole program within double-quotes. As an example:
- program: "/user/plumb/jobserver/user-progs/plumbmover.sh -M pstream -R1 -O day -v -v -S'-i /path/to/somekey' -H somebody@host.isi.edu - /some_destination/"
For a pipeline stage, if sum of number of input(s) and output(s) exceed two,
then Plumb provides input(s) and output(s) using command line via multiple switches
-i
and -o
for input(s) and ouptputs(s) respectively.
Following the -i
or -o
argument will be local FIFOs with names like 20190709-212835-00011659.ari.pcap.sz
etc.
Such names are helpful for user program to robustly check that which argument is where
(It is not a good idea to rely on positional args. See example a little big later how to do it).
User program needs to read from / write to these provided fifos.
Plumb feed data into these fifos and read data out of fifos.
A plander job that starts with pcap.sz
and ultimately ends with message_question.fsdb.xz
:
-
input: pcap.sz
program: "/user/plumb/jobserver/user-progs/dnsanon_wrapper_snzip.sh"
output: [ message_question.fsdb, message.fsdb ]
-
input: message_question.fsdb
program: "/user/plumb/jobserver/user-progs/xz2"
output: message_question.fsdb.xz
Program needs to reside somewhere in HDFS (a location readable by the user configured to run Hadoop).
While Plumb can take more than one inputs into a stage program, but we currently don’t have a working example for such a case. Additionally, one caveat is that if data on multiple inputs do not overlap in time, the semantics of processing for the code is not clear. We hope that our ongoing work on advanced windowing will resolve this issue.
Other options can be provided, in addition to input and output. Rate limiting (maximum number of concurrent jobs) can be specified with:
max_concurrent_jobs: 10
In the future we plan to add resource constraints. For example, core: 2
and memory_gb: 16
will reserve
the given number of cores or memory for the job (By default today each job gets 1 core and about 2GB of memory.)
That implies that the user program should strive to use just one core and should not buffer a lot of data.
At this point, we haven’t yet enabled YARN’s CPU enforcer but we plan to do so in future.
As an experimental feature Plumb currently supports windowing, e.g.:
-
input: rssacint
program: "/mapreduce_wrapper.sh"
output: rssac_mapreduce_stage1
window:
size: "24 hours"
start: "2020-04-01-00-00-UTC"
max_wait: "3 hours"
cutoff_delivery_requirement: "Cutoff_Exactly_Once_To_Current_Window"
use_queue_sequence_number_instance: "Yes"
queue_sequence_number_instance_list: "lax,mia,ari"
or
-
input: message.fsdb
program: "/user/aqadeer/jobserver/user-progs/rssac_wrapper.sh"
output: rssacint
window:
start_and_end: "2020-04-02-00-00-UTC to 2020-04-05-00-00-UTC"
For stateful streaming, use this style of YAML:
-
input: pcap.sz
program: "/user/plumb/jobserver/user-progs/zeek_app.sh"
output: [weird.log, notice.log ]
abstraction: stateful_streaming
(xxx need more details here)
When plumb runs a user program, following environment variables are available to user job:
PLUMB_INPUTS
Example:
PLUMB_INPUTS=/user/plumb/hle/raw/message_question.fsdb/conqdata/20190717-174112-01529933.lax.message_question.fsdb
PLUMB_OUTPUTS
Example:
PLUMB_OUTPUTS=/user/plumb/hle/raw/message_question.fsdb.xz/20190717-174112-01529933.lax.message_question.fsdb.xz
Jobs have 4 possible states: submitted, accepted, rejected, retracted. They start out submitted, then become accepted once they start executing (or rejected if they are misconfigured). An accepted job becomes retracted when a user asks to have it removed.
Run /usr/local/bin/plumb-job
to see the available list of commands on any plumb-installed system.
New jobs are describe in a YAML file and submitted as:
/usr/local/bin/plumb-job submitJob /tmp/szToxz.yaml
Note that Plumb application server will need to wait for all the currently running jobs to finish
on the YARN cluster and will need to restart to incorporate any submit or retract changes across all users.
This process can take a while because there could be many jobs currently running on the cluster.
Current YARN cluster status can be monitored via YARN status page:
http://<head-node>:8088/cluster/nodes
Once submitted, the job will start as soon as all current running jobs complete. (An implication is that submitting a new job requires draining the queue, so don’t do it casually.)
Run
/usr/local/bin/plumb-job retractJob JOB-URI
Just like submission case, all currently running jobs first need to finish. After that jobs will be retracted.
(xxx: what are job-uris? are they the lines from listAcceptedJobs?)
plumb-job listAcceptedJobs #lists accepted jobs for (you) the user
plumb-job listSubmittedJobs
plumb-job listRejectedJobs
plumb-job listRetractedJobs
Create a pipeline.yml
file in the current directory with details for all jobs in the system:
plumb-job getOptYamlText
Create a pipeline.png
file showing the diagram of the optimized workflow:
plumb-job getOptYamlDiagram
The command /usr/local/bin/plumb
allows one to watch data flow through the system.
Enqueue a file <filename>
into the queue <qname>
:
plumb put <filename> <qname>
List all of the current active queue names:
plumb ls-q
List currently queued files (aka large blocks) for a given <qname>
:
plumb ls-f <qname>
Remove a file from a queue:
plumb rm-f <qname> <filename>
Display a queue entry checksum:
plumb queue-entry-checksum <file-uri>
Dump a file’s content to stdout
:
plumb queue-entry-cat <file-uri>
Note: It is advised to use double quotes while providing <qname>
, because some characters
(like dot) in names, if provided without double quotes can cause the program to give wrong results.
List faulty files in a queue:
plumb ls-faulty
Mark a file in a queue as faulty:
plumb queue-entry-fault
Marks a file as healthy so that Plumb retires it for processing
plumb queue-entry-restore
Please note that windowing functionality is not well tested and cannot be used in production environment at this time.
List those queues that have windows
plumb ls-windowed-q
list possible states a window can be
plumb ls-all-window-statuses
list windows inside a queue
plumb ls-win
list windows in a queue in a specific status
plumb ls-win-in-status
set status of a window (we can use it to artifically make a window FORMED so that it runs for processing)
plumb set-win-status
list files in a window
plumb ls-f-in-win
change window no for a file
plumb mv-f-in-win
experimental functions to facilitate debugging (xxx don’t work)
plumb ls-missing-files-in-a-window, ls-site-info-in-a-window
Note: Contrary to other commands in the plumb tool, all windowing functions need some state from Plumb. Therefore HLE/Window Manager must be operational and up-to-date for the above windowing functions to work.
The course of action if new windowing does not work as expected:
If a user job fails, it is necessary for the user to understand what went wrong.
Plumb generates substantial diagnostic data for each job to facilitate such root-cause findings.
User can also generate his/her own debugging data from his/her code (e.g. using set -x
in a bash script).
To see all such debugging data do the following:
http://<headnode>:8088/cluster/apps/
)application_1568404903922_5011798
)Open a terminal to any of the cluster boxes and run the following command to view yarn logs:
yarn logs -applicationId application_1568404903922_5011798
Data can be simply put into a desired queue by running:
plumb put <fileuri> <qname>
Where:
<fileuri>
points either to a local file or HDFS
<qname>
one of the queues created by runningplumb-job submitJob
and displayed by runningplumb ls-q