ActorWordCountDemo.scala
import java.io.File
import com.sensetime.{ReceiveWord, SendFile, StopMessage}
import scala.actors.Actor
import scala.collection.mutable.ListBuffer
import scala.io.Source
object CountActor extends Actor {
override def act(): Unit = {
// 计算线程,注意需要使用while循环阻塞接收消息
while(true) {
receive {
// 收到解析请求
case SendFile(file) => {
// 获取文件并做单个文件的WordCount
val lines = Source.fromFile(file, "utf-8").getLines().toList
val words = lines.flatMap(_.split(" "))
val result = words.map((_, 1)).groupBy(_._1).mapValues(_.size)
// 返回WordCount结果给WordActor线程
sender ! ReceiveWord(result)
}
// 收到停止消息,退出线程
case StopMessage => {
println("Parse Stopped")
return
}
}
}
}
}
class WordActor extends Actor {
val countActor = CountActor
var results: ListBuffer[Map[String, Int]] = ListBuffer[Map[String, Int]]()
var files: List[File] = _
def this(dirName: String) {
this()
val dir = new File(dirName)
// 列出目录下所有文件
files = dir.listFiles.filter(_.isFile).toList
}
override def act(): Unit = {
// 发送各个文件给countActor进行处理
for(file <- files) {
// 发送各个文件
countActor ! SendFile(file)
}
// 获取文件长度
val len = files.length
var i = 0
// 逐个文件进行处理
while(i<=len) {
// 若处理完毕了
if(i==len) {
// 发送停止消息,等待计算线程停止
countActor ! StopMessage
// 计算最终结果并打印
val stringToInt = results.flatten.groupBy(_._1).mapValues(_.foldLeft(0)(_+_._2))
println(s"最终结果: $stringToInt")
// 退出程序
return
}
// 接收单个计算结果,并自增计数
receive {
case ReceiveWord(result) => {
i +=1
results += result
}
}
}
}
}
object WordActor {
def apply(dirName: String): WordActor = new WordActor(dirName)
}
object ActorWordCountDemo {
def main(args: Array[String]): Unit = {
WordActor("D:/wordcount").start()
CountActor.start()
}
}
Messages.scala
package com.sensetime
import java.io.File
case class SendFile(file: File)
case class ReceiveWord(result: Map[String, Int])
case class StopMessage
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>actor-word-count</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-actors</artifactId>
<version>2.10.5</version>
</dependency>
</dependencies>
</project>
程序输出如下,
图片.png











网友评论