没有合适的资源?快使用搜索试试~ 我知道了~
一、利用Java程序实现词频统计 二、利用Scala程序实现词频统计 三、利用Python程序实现词频统计 四、利用Akka和Scala实现词频统计 五、利用MapReduce实现词频统计 六、利用Hive实现词频统计 七、利用Storm实现词频统计 八、利用Spark实现词频统计
资源推荐
资源详情
资源评论
采用多种方式实现词频统计
采用多种方式实现词频统计采用多种方式实现词频统计
一、利用Java程序实现词频统计
二、利用Scala程序实现词频统计
三、利用Python程序实现词频统计
四、利用Akka和Scala实现词频统计
五、利用MapReduce实现词频统计
六、利用Hive实现词频统计
七、利用Storm实现词频统计
八、利用Spark实现词频统计
文本文件文本文件test.txt
一、利用一、利用Java程序实现词频统计程序实现词频统计
package net.hw.wc;
import java.io.BufferedReader;
import java.io.FileReader;
import java.util.HashMap;
import java.util.Map;
/**
* Created by howard on 2018/2/6.
*/
public class WordCount {
public static void main(String[] args) throws Exception {
BufferedReader br = new BufferedReader(new FileReader("test.txt"));
Map<String, Integer> map = new HashMap<>();
String nextLine = "";
while ((nextLine = br.readLine()) != null) {
String[] data = nextLine.split(" ");
for (String word : data) {
map.put(word, map.containsKey(word) ? map.get(word) + 1 : 1);
}
}
for (String key : map.keySet()) {
System.out.println(key + ": " + map.get(key));
}
}
}
二、利用二、利用scala程序实现词频统计程序实现词频统计
package net.hw.wc
import scala.io.Source
/**
* Created by howard on 2018/2/6.
*/
object WordCount {
def main(args: Array[String]): Unit = {
val rx = Source.fromFile("test.txt")
.getLines()
.toList.mkString(" ").split(" ")
.map((_, 1))
.groupBy(_._1)
.mapValues(_.map(_._2))
.mapValues(_.reduce(_ + _))
rx.foreach(println)
}
}
三、利用三、利用Python程序实现词频统计程序实现词频统计
file = open("test.txt", "r")
words = []
for line in file:
for word in line.replace('\n', '').split(" "):
words.append(word)
map = {}
for word in words:
map[word] = map[word] + 1 if word in map.keys() else 1
for key in map:
print(key + ": " + str(map[key]))
四、利用四、利用akka和和scala实现词频统计实现词频统计
((1)创建)创建AKKAUtils类,提供获取类,提供获取akka配置的函数配置的函数
package net.hw.akka.wc
import java.util.HashMap
import java.util.ArrayList
/**
* Created by howard on 2017/8/27.
*/
object AKKAUtils {
def getConf(ip: String, port: String): HashMap[String, Object] = {
val conf = new HashMap[String, Object]()
val list = new ArrayList[String]()
list.add("akka.remote.netty.tcp")
conf.put("akka.remote.enabled-transports", list)
conf.put("akka.actor.provider", "akka.remote.RemoteActorRefProvider")
conf.put("akka.remote.netty.tcp.hostname", ip)
conf.put("akka.remote.netty.tcp.port", port)
return conf
}
}
((2)创建)创建WcInfo1,封装从,封装从WcDriver发往发往WcMapper的数据的数据
package net.hw.akka.wc
/**
* Created by howard on 2017/8/27.
*/
case class WcInfo1(data: String, mapFunc: String => Array[(String, Int)],
reduceFunc: Array[(String, Int)] => Map[String, Int]) {
val datax = data
val mapFuncx = mapFunc
val reduceFuncx = reduceFunc
}
WcMapper接收字符串data,调用mapFunc进行处理,返回的是tuple数组arr: Array[(String, Int),于
是,WcReducer接收的参数就是arr: Array[(String, Int)。
((3)创建)创建WcInfo2,封装从,封装从WcDriver发往发往WcMapper的数据的数据
package net.hw.akka.wc
/**
* Created by howard on 2017/8/27.
*/
case class WcInfo2(arr: Array[(String, Int)],
reduceFunc: Array[(String, Int)] => Map[String, Int]) {
val arrx = arr
val reduceFuncx = reduceFunc
}
((4)创建)创建WcDriver
package net.hw.akka.wc
import java.util.Scanner
import akka.actor.ActorSystem
import com.typesafe.config.ConfigFactory
/**
* Created by howard on 2017/8/27.
*/
object WcDriver {
def main(args: Array[String]): Unit = {
val sys = ActorSystem("myAkkaClientSys", ConfigFactory.parseMap(AKKAUtils.getConf("127.0.0.1", "44444")))
val scan = new Scanner(System.in)
while (true) {
val data = scan.nextLine();
val mapFunc = (line: String) => {
val arr = line.split(" ")
arr.map((_, 1))
}
val reduceFunc = (arr: Array[(String, Int)]) => {
arr.groupBy(_._1).mapValues(_.map(_._2)).mapValues(_.reduce(_ + _))
}
sys.actorSelection("akka.tcp://myAkkaServerSys@127.0.0.1:44443/user/mapperActor") ! new WcInfo1(data, mapFunc,
reduceFunc);
}
}
}
WcDriver获取了行信息之后,定义了两个函数。然后定义自己的ActorSystem对象sys,监听本机的44444端
口,然后往本机44443端口的mapperActor发送信息WcInfo1对象,封装了读取的行数据,以及对数据进行处
理的两个函数。
((5)创建)创建WcMapper
剩余27页未读,继续阅读
资源评论
howard2005
- 粉丝: 1w+
- 资源: 20
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功