您的位置:首页 > 移动开发

How to send a phone message when a spark application is end.

2017-05-03 15:59 507 查看
Sometimes, we want to get a message when spark application is end. The application end event is especially import for streaming application, because it represents there is some error occurred.

To embed the service to application, SendMsgOnAppEndListener class should be imported first.

import org.apache.spark.plugin.xbu.bigdata.SendMsgOnAppEndListener.


Secondly, add the listener to spark context.

Parameter description:

url: the phone message url.

phoneList: the phone list to whom the message be send when the application is end, separated by common character.

msg: the message .

val sc : SparkContext = new SparkContext(sparkConf);
sc.addSparkListener(new SendMsgOnAppEndListener(
url = "http://server:port/msg/sendmsg",
phoneList = "xxxxxx",
msg = "TestSendMsgOnAppEndListener error"))


The source of SendMsgOnAppEndListener:

package org.apache.spark.plugin.xbu.bigdata

import org.apache.spark.scheduler._
import org.apache.http.impl.client.DefaultHttpClient
import org.apache.http.message.BasicNameValuePair
import org.apache.http.client.methods.HttpPost
import org.apache.http.NameValuePair
import org.apache.http.client.entity.UrlEncodedFormEntity
import java.util.ArrayList
/**
* Created by houzhizhen on 17-5-2.
*/
class SendMsgOnAppEndListener(val url : String = "http://yq01-sw-hds12.yq01.baidu.com:8001/msg/sendmsg",val phoneList: String, val msg: String)  extends SparkListenerInterface{

override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = ???

override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = ???

override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = ???

override def onTaskGettingResult(taskGettingResult: SparkListenerTaskGettingResult): Unit = ???

override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = ???

override def onJobStart(jobStart: SparkListenerJobStart): Unit = ???

override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = ???

override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate): Unit = ???

override def onBlockManagerAdded(blockManagerAdded: SparkListenerBlockManagerAdded): Unit = ???

override def onBlockManagerRemoved(blockManagerRemoved: SparkListenerBlockManagerRemoved): Unit = ???

override def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD): Unit = ???

override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = ???

override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {

val httpClient = new DefaultHttpClient();
val httppost = new HttpPost(url)
val formparams = new ArrayList[NameValuePair]()
formparams.add(new BasicNameValuePair("phonelist", phoneList))
formparams.add(new BasicNameValuePair("msg", msg))
val uefEntity = new UrlEncodedFormEntity(formparams, "UTF-8")
httppost.setEntity(uefEntity)
httpClient.execute(httppost);

}

override def onExecutorMetricsUpdate(executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit = ???

override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = ???

override def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit = ???

override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit = ???

override def onOtherEvent(event: SparkListenerEvent): Unit = ???
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐