SparkStreaming auto restart

The company's real-time streaming pipeline uses SparkStreaming to process data imported into Kinesis.There was a Streaming task termination before and no alarm was notified.Plus, there is currently only one person in the company's big data department. In order to prevent being called by the police one night to restart the service, we decided to restart the Streaming task automatically.

1. Overall thinking

The idea for automatic restart is simple: periodically check for tasks running in yarn and whether there are Streaming tasks in the submitted production environment.Periodic checks on Linux can be done using crontab.The minimum accuracy of crontab is 1 minute check, which means it is possible that the service will hang up for up to 1 minute, which is barely acceptable for my usage scenario.

2. Scripting

Use Python to write the restart script restart.py, where calling to another submit-job.py is a command that submits the Streaming task, which can be configured differently depending on your environment.In the script restart.py, tasks that are currently running (committed) on YARN are checked to see if they have a task in the specified environment and restart if they do not.

import os
import sys
import threading
import time

class SubmitJob(threading.Thread):
    def __init__(self, env):
        threading.Thread.__init__(self)
        self.env = env

    def run(self):
        cmd = "python /home/hadoop/submit-job.py " + self.env
        print(cmd)
        os.system(cmd)

app_id_qa = os.popen("source yarn application --list | grep dime-streaming-qa | awk '{print $ 1}'").read().strip()

if len(app_id_qa) == 0:
    print "streaming job is missing, prepare to restart it!"
    count = 5
    while len(app_id_qa) == 0 and count > 0:
        job = SubmitJob("qa")
        job.start()
        time.sleep(30)
        app_id_qa = os.popen("source yarn application --list | grep dime-streaming-qa | awk '{print $ 1}'").read().strip()
        if (len(app_id_qa) != 0):
            print("restart success! System exit")
            sys.exit()
        count = count - 1
else:
    print "streaming job is still working"
import os
import sys

env = sys.argv[1]
print(env)
cmd = ""
if env == "qa":
    cmd = "spark-submit --class com.strikingly.dime.spark.DimeJob     --master yarn --deploy-mode cluster --name dime-streaming-qa     --conf spark.driver.memory=1g --num-executors=2     --executor-cores=2 --executor-memory=2G /home/hadoop/dime-jobs.jar     -d 's3://dime-checkpoint/dime-preproduction/'     -t streaming -n test  > /dev/null 2>&1"
else:
    print("Not supported environment: %s, please pass in 'prod', 'preprod' or 'qa'!"%(env))

os.system(cmd)

3. Timed execution

Use the crontab setting to execute the script every minute to check if the task is running.Execute the crontab-e command and enter */1 * * * * * python/home/hadoop/restart.py.

Tags: Operation & Maintenance crontab Python Hadoop Spark

Posted on Sat, 30 Nov 2019 03:06:47 -0800 by sapna