Spark Streaming 基本概念

介绍

Spark Streaming架构图

the micro-batch architecture of Spark Streaming

Spark Streaming 基本概念

Execution of Spark Streaming within Spark’s components

Spark Streaming 基本概念

JAVA代码示例

执行方式

1:修改log4j的日志级别为error,不然会打印太多的日志

2:将如下两个类导出一个jar

3: nc -lk ip port

3: 使用spark-submit提交任务 

spark-submit  --class com.spark.streaming.SimpleDemo test.jar

JAVA代码

import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.net.ConnectException; import java.net.Socket;  import org.apache.spark.storage.StorageLevel; import org.apache.spark.streaming.receiver.Receiver;   /**  * DateTime: 2015年6月18日 下午5:08:20  *  */ public class JavaCustomReceiver extends Receiver<String> {      String host = null;     int port = -1;     Socket socket = null;     BufferedReader reader = null;       public JavaCustomReceiver(String host_, int port_) {         super(StorageLevel.MEMORY_ONLY());         host = host_;         port = port_;     }       public void onStart() {         // Start the thread that receives data over a connection         new Thread() {             @Override             public void run() {                 receive();             }         }.start();     }       public void onStop() {         try {             if (socket != null) {                 socket.close();             }             if (reader != null) {                 reader.close();             }         }         catch (IOException e) {             e.printStackTrace();         }     }       /** Create a socket connection and receive data until receiver is stopped */     private void receive() {         String userInput = null;         try {             // connect to the server             socket = new Socket(host, port);             reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));              // Until stopped or connection broken continue reading             while (!isStopped() && (userInput = reader.readLine()) != null) {                 System.out.println("Received data '" + userInput + "'");                 store(userInput);             }             reader.close();             socket.close();              // Restart in an attempt to connect again when server is active             // again             restart("Trying to connect again");         }         catch (ConnectException ce) {             // restart if could not connect to server             restart("Could not connect", ce);         }         catch (Throwable t) {             // restart if there is any other error             restart("Error receiving data", t);         }     }  }


发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注