1.启动spark-shell
[hadoop@hadoop001 bin]$ ./spark-shell --master local[2]
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://hadoop001:4040
Spark context available as 'sc' (master = local[2], app id = local-1538087287382).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.3.1
/_/
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_45)
Type in expressions to have them evaluated.
Type :help for more information.
scala> import org.apache.spark._
import org.apache.spark._
scala> import org.apache.spark.streaming._
import org.apache.spark.streaming._
scala> val ssc = new StreamingContext(sc,Seconds(5))
ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@734cf881
scala> val lines = ssc.textFileStream("/streaming/input/")
lines: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.MappedDStream@4c4215d7
scala> val words = lines.flatMap(_.split("\t")).map((_,1)).reduceByKey(_+_)
words: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.ShuffledDStream@4e1104f4
scala> words.print()
scala> ssc.start()
scala> ssc.awaitTermination()
-------------------------------------------
Time: 1538087585000 ms
-------------------------------------------
-------------------------------------------
Time: 1538087590000 ms
-------------------------------------------
-------------------------------------------
Time: 1538087595000 ms
-------------------------------------------
-------------------------------------------
Time: 1538087600000 ms
-------------------------------------------
2.put文件至hdfs下的路径/streaming/input/
[hadoop@hadoop001 data]$ hadoop fs -ls /streaming/input/
Found 4 items
-rw-r--r-- 1 hadoop supergroup 44 2018-09-28 06:24 /streaming/input/1.txt
-rw-r--r-- 1 hadoop supergroup 44 2018-09-28 06:25 /streaming/input/2.txt
-rw-r--r-- 1 hadoop supergroup 44 2018-09-28 06:25 /streaming/input/3.txt
[hadoop@hadoop001 data]$ hadoop fs -put ruozeinput.txt /streaming/input/10.txt
3.streaming输出处理结果
-------------------------------------------
Time: 1538087645000 ms
-------------------------------------------
-------------------------------------------
Time: 1538087650000 ms
-------------------------------------------
(hello,4)
(welcome,1)
(world,2)
-------------------------------------------
Time: 1538087655000 ms
-------------------------------------------
sparkstreaming只能读取在它启动之后,写入hdfs的文件,原来已有的1.txt,2.txt,3.txt无法读取
注意:All files must be in the same data format.所有文件必须为相同的格式
[hadoop@hadoop001 resources]$ hadoop fs -put users.parquet /streaming/input/11
把一个parquet文件写到/streaming/input/中去,streaming读出来的是乱码
-------------------------------------------
Time: 1538088485000 ms
-------------------------------------------
(\Hexample.avro.User
%name%
%fa+-_i+e_c-+-_%5fa+-_i+e_+++be__%a__ay<&
+a+eDH&P
fa+-_i+e_c-+-_<@&P&�%(fa+-_i+e_+++be__a__ay,1)
(PAR1"@A+y__aBe+,1)
(ZZ&��
a+_-._che+a�{"+y-e":"_ec-_d","+a+e":"U_e_","+a+e_-ace":"e|a+-+e.a+_-","fie+d_":[{"+a+e":"+a+e","+y-e":"_+_i+g"},{"+a+e":"fa+-_i+e_c-+-_","+y-e":["_+_i+g","++++"]},{"+a+e":"fa+-_i+e_+++be__","+y-e":{"+y-e":"a__ay","i+e+_":"i++"}}]}-a_-+e+-+_ +e__i-+ 1.4.3�PAR1,1)
(@,1)
(0red88,,1)
-------------------------------------------
Time: 1538088490000 ms
-------------------------------------------
[hadoop@hadoop001 resources]$ hadoop fs -put people.json /streaming/input/12
[hadoop@hadoop001 resources]$ hadoop fs -put people.csv /streaming/input/16
json,csv等可以读出来,但是wordcount做的并不对,因此文件格式必须一致才可以正确的单词计数,生产上都是同一个格式的文件丢在一个文件夹里的
-------------------------------------------
Time: 1538088650000 ms
-------------------------------------------
({"name":"Michael"},1)
({"name":"Andy", "age":30},1)
({"name":"Justin", "age":19},1)
-------------------------------------------
Time: 1538088655000 ms
-------------------------------------------
......
......
-------------------------------------------
Time: 1538089055000 ms
-------------------------------------------
(Jorge;30;Developer,1)
(Bob;32;Developer,1)
(name;age;job,1)
-------------------------------------------
Time: 1538089060000 ms
-------------------------------------------










网友评论