scala 下 kafka 实战简介
2018-01-22 14:13
288 查看
kafka应用实战:从一个topic接受数据,调用接口处理后发送到另一个topic
关于kafka,其实我也不知道怎么样才能介绍的具体明了,但是我就是想提供一个简单的使用示例,让我们对kafka有个一初步的了解。其实吧,我们知道,数据可以从某个角度分成两类,第一类是批量数据,也就是已经放好的数据,打个比方,就像池塘的水,不会流动,你想怎么计算就怎么计算,你可以统计一下水量,也可以将它们做别的用途,但水就那么多放在那里,你可以把它放在任何一个水库慢慢用。另一类就是流式数据,就是会实时变动的数据,也打个比方,就像河流的水,它不是静止不动的,它是一直在流动的,每时每刻水都在发生改变,也就是你无法像之前一样,把它放在任何水库慢慢用,你只能取出一部分用,当你用完,新的水又流进来了。
kafka就是其中的一条河流,当然不只kafka而已,就像世界上存在很多河流,kafka只是其中之一。kafka是根据topic 来存储数据的,每个topic里面根据不同的偏移量来定位数据。比如下面的代码就是scala语言编写的往topic中发送数据,我们称之为制造者。
当我们成功往kafka topic中发送了一系列信息的时候,意味着我们定义了一个河流是怎么产生数据的,随时发送,随时更新topic中的数据,但是要怎么取出来使用呢?那就是kafka消费者的定义,接下来看一个消费者代码。这个是主要程序,里面只是包含怎么使用消费者。
注意:同一个组的消费者不会重复消费同一个topic里面的数据。
关于getContent 和 getScore代码如下:
获得分数是调用一个python接口,getScore函数如下:
关于kafka,其实我也不知道怎么样才能介绍的具体明了,但是我就是想提供一个简单的使用示例,让我们对kafka有个一初步的了解。其实吧,我们知道,数据可以从某个角度分成两类,第一类是批量数据,也就是已经放好的数据,打个比方,就像池塘的水,不会流动,你想怎么计算就怎么计算,你可以统计一下水量,也可以将它们做别的用途,但水就那么多放在那里,你可以把它放在任何一个水库慢慢用。另一类就是流式数据,就是会实时变动的数据,也打个比方,就像河流的水,它不是静止不动的,它是一直在流动的,每时每刻水都在发生改变,也就是你无法像之前一样,把它放在任何水库慢慢用,你只能取出一部分用,当你用完,新的水又流进来了。
kafka就是其中的一条河流,当然不只kafka而已,就像世界上存在很多河流,kafka只是其中之一。kafka是根据topic 来存储数据的,每个topic里面根据不同的偏移量来定位数据。比如下面的代码就是scala语言编写的往topic中发送数据,我们称之为制造者。
package kafka import java.util.Properties import kafka.producer.ProducerConfig import kafka.producer.Producer import kafka.producer.KeyedMessage object KafkaProducer { def sendmessage(presend:String,topic:String,key:String): Unit = { val brokers = "10.2.117.160:9092,10.2.117.162:9092" //定义broker节点 val props = new Properties() props.put("delete.topic.enable", "true") props.put("metadata.broker.list", brokers) props.put("serializer.class", "kafka.serializer.StringEncoder") props.put("partitioner.class", classOf[HashPartitioner].getName) props.put("queue.buffering.max.messages", "1000000") props.put("queue.enqueue.timeout.ms", "20000000") props.put("batch.num.messages", "1") props.put("producer.type", "sync") val config = new ProducerConfig(props) val producer = new Producer[String, String](config) // key 和 value 都是 String类型 val sleepFlag = true if (sleepFlag) Thread.sleep(1000) val message = new KeyedMessage[String, String](topic, key,presend.toString) producer.send(message) //发送到指定的topic } }以上代码定义了一个 sendmessage 函数,用来像指定的topic中发送key-value类信息。
当我们成功往kafka topic中发送了一系列信息的时候,意味着我们定义了一个河流是怎么产生数据的,随时发送,随时更新topic中的数据,但是要怎么取出来使用呢?那就是kafka消费者的定义,接下来看一个消费者代码。这个是主要程序,里面只是包含怎么使用消费者。
注意:同一个组的消费者不会重复消费同一个topic里面的数据。
package kafka import java.util.Properties import kafka.consumer.ConsumerConfig import kafka.consumer.Consumer import kafka.message.MessageAndMetadata import GetPost.getScore //这里是一个接口,我们从一个topic取数据,取完后调用这个接口得到分数后,发送到另一个topic import GetContent.getContent //解析json消息的处理函数 import KafkaProducer.sendmessage //处理后需要发送,所以引入上面定义的制造者代码 import sun.awt.SunHints.Key object kafka_consumer extends App{ var groupid = "oyp_test24" //kafka有一个注意点,就是同一个组的消费者不会再取topic中已经消费过的数据 var consumerid = "oyp_consumer" var topic = "ase-info" //取数据源的topic val props:Properties = new Properties() props.put("zookeeper.connect", "10.2.117.133:2181,10.2.117.160:2181,10.2.117.162:2181") props.put("group.id", groupid) props.put("client.id", "test") props.put("consumer.id", consumerid) props.put("auto.offset.reset", "smallest") props.put("auto.commit.enable", "true") props.put("auto.commit.interval.ms", "100") val consumerConfig:ConsumerConfig = new ConsumerConfig(props) val consumer = Consumer.create(consumerConfig) //创建一个消费者 val topicCountMap = Map(topic -> 1) val consumerMap = consumer.createMessageStreams(topicCountMap) val streams = consumerMap.get(topic).get //从topic中获取源数据 for (stream <- streams) { val it = stream.iterator() while (it.hasNext()) { val messageAndMetadata = it.next() // val message = s"Topic:${messageAndMetadata.topic}, PartitionID:${messageAndMetadata.partition}, " + // s"Offset:${messageAndMetadata.offset},Message Payload: ${new String(messageAndMetadata.message())}" val message:String = s"${new String(messageAndMetadata.message())}" val key:String = s"${messageAndMetadata.partition}".toString var presend :String = "" try { val content_text: String = getContent(message) //提取message中的json内容 if (content_text != "Invalid Map") { val result = getScore(content_text) //调用接口处理,获得分数 presend = message.substring(0,message.length - 1) + "," + result.substring(1) } } catch { case _ => presend = message } sendmessage(presend,"oyp-test-No1",key) //将处理后的消息presend 重新发送到另一个topic } } }上述代码就是从topic 为 ase-info中获取数据,是一个json串,再调用getContent方法解析里面的内容后,调用getScore方法获得分数,得到presend,再发送到另一个topic "oyp-test-No1" 。
关于getContent 和 getScore代码如下:
package kafka import scala.collection.mutable import scala.util.parsing.json import scala.util.parsing.json.JSONObject object GetContent { def getContent(json_text1:String):String = { val json_text = getJson_clear(json_text1) var result: String = "" var obj = json.JSON.parseFull(json_text) var text: String = "" var title: String = "" obj match { case Some(map: Map[Any, Any]) => { text = map.get("content").toString() text = text.substring(5, text.length() - 1) } case _ => text = "Invalid Map" } return (text.trim.replaceAll("%","个百分点")) } def getJson_clear(message:String) :String= { val start: Int = message.indexOf(""""content":""") if (start >= 0) { val end: 4000 Int = message.substring(start).indexOf(""""media":""") - 1 + start if (end >= 0) { return ("{" + message.substring(start, end) + "}") } else {return ("{"+message.substring(start))} } else {return message} } }这里的处理由于特殊的问题导致了一些特殊的处理,可以自己进行修改,反正功能是从json串获得 content的value值。
获得分数是调用一个python接口,getScore函数如下:
package kafka import org.apache.http.client.methods.HttpPost import org.apache.http.impl.client.DefaultHttpClient import scala.util.parsing.json.JSONObject import org.apache.http.entity.StringEntity import java.nio.charset.Charset object GetPost { def getScore(text:String) :String= { // create our object as a json string val spock = Map ("content" -> text.toString) val spockAsJson = new JSONObject(spock).toString() //println(text) val url = "http://10.2.145.116:8788/snownlp" val post = new HttpPost(url) val client = new DefaultHttpClient post.addHeader("Content-type","application/json; charset=utf-8") post.setHeader("Accept", "application/json") post.setEntity(new StringEntity(spockAsJson, Charset.forName("UTF-8"))) post.formatted(spockAsJson) println(post.getURI()) val response = client.execute(post) import org.apache.http.util.EntityUtils val body = EntityUtils.toString(response.getEntity()) return (body.toString) } }备注:在制造者中有一个 props.put("partitioner.class", classOf[HashPartitioner].getName),其中HashPartitioner代码如下
package kafka import kafka.producer.Partitioner import scala.math._ import kafka.utils.VerifiableProperties class HashPartitioner extends Partitioner { def this(verifiableProperties: VerifiableProperties) { this } override def partition(key: Any, numPartitions: Int): Int = { if (key.isInstanceOf[Int]) { abs(key.toString().toInt) % numPartitions } key.hashCode() % numPartitions } }
相关文章推荐
- scala 下 kafka 实战简介
- scala 下 kafka 实战简介
- scala 下 kafka 实战简介
- scala 下 kafka 实战简介
- scala 下 kafka 实战简介
- scala 下 kafka 实战简介
- scala 下 kafka 实战简介
- scala 下 kafka 实战简介
- scala 下 kafka 实战简介
- scala 下 kafka 实战简介
- scala 下 kafka 实战简介
- scala 下 kafka 实战简介
- scala 下 kafka 实战简介
- scala 下 kafka 实战简介
- scala 下 kafka 实战简介
- scala 下 kafka 实战简介
- scala 下 kafka 实战简介
- scala 下 kafka 实战简介
- 第114课(Scala版本)SparkStreaming+Kafka+Spark SQL+TopN+Mysql 电商广告点击综合案例实战
- scala.sys.process简介及实战