<dependency>
-
<groupId>org.apache.spark</groupId>
-
<artifactId>spark-streaming_2.11</artifactId>
-
<version>2.1.0</version>
- </dependency>
-
import java.io.OutputStream;
-
import java.net.ServerSocket;
-
import java.net.Socket;
-
import java.util.Arrays;
-
import java.util.UUID;
-
-
import org.apache.spark.SparkConf;
-
import org.apache.spark.streaming.Durations;
-
import org.apache.spark.streaming.api.java.JavaDStream;
-
import org.apache.spark.streaming.api.java.JavaPairDStream;
-
import org.apache.spark.streaming.api.java.JavaStreamingContext;
-
-
import scala.Tuple2;
-
-
public class SparkStreamTest {
-
-
public static void main(String[] args) throws Exception {
-
startSockerServer(9999);
-
-
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");
-
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
-
JavaDStream<String> lines = jssc.socketTextStream("localhost", 9999);
-
-
JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(x.split(" ")).iterator());
-
JavaPairDStream<String, Integer> pairs = words.mapToPair(s -> new Tuple2<>(s, 1));
-
JavaPairDStream<String, Integer> counts = pairs.reduceByKey((x, y) -> x + y);
-
counts.print();
-
-
jssc.start();
-
jssc.awaitTermination();
-
jssc.close();
-
}
-
-
private static void startSockerServer(int port) throws Exception {
-
new Thread(){
-
public void run() {
-
try {
-
ServerSocket server = new ServerSocket(port);
-
Socket socket = server.accept();
-
OutputStream os = socket.getOutputStream();
-
for (int i = 0; i < 1000; i++) {
-
String txt = UUID.randomUUID().toString().replaceAll("-", " ");
-
txt = "Hello test\n";
-
os.write(txt.getBytes());
-
Thread.sleep(10);
-
}
-
socket.close();
-
server.close();
-
} catch (Exception e) {
-
e.printStackTrace();
-
}
-
}
- }.start();
- }
- }
-------------------------------------------
Time: 1490947248000 ms
-------------------------------------------
(Hello,99)
(test,99)