1. 云栖社区>
  2. >
  3. 正文

Akka MapReduce 去停词单词统计

作者:用户 来源:互联网 时间:2018-09-04 19:23:57

编程基础akka

Akka MapReduce 去停词单词统计 - 摘要: 本文讲的是Akka MapReduce 去停词单词统计, 代码转载自 https://github.com/write2munish/Akka-Essentials WordCountMapReduce WordCountMapReduce : This examples

代码转载自

https://github.com/write2munish/Akka-Essentials

WordCountMapReduce


WordCountMapReduce : This examples implements the Word Count Map Reduce model. The client system reads a text file and sends each line of text as a message to the Server. The server reads the line, maps the words, reduces the words and finally does an inmemory aggregation of the result. The example also implemented a prioritymailbox, which is used to segregate the message requests between the mapreduce requests and getting the list of results from the aggregate actor


client端代码

package org.akka.essentials.wc.mapreduce.example.client;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.actor.UntypedActorFactory;
import akka.kernel.Bootable;

import com.typesafe.config.ConfigFactory;

public class Client implements Bootable {

	/**
	 * @param args
	 */
	public static void main(String[] args) {

		final String fileName = "Othello.txt";

		ActorSystem system = ActorSystem.create("ClientApplication",
				ConfigFactory.load().getConfig("WCMapReduceClientApp"));

		final ActorRef fileReadActor = system.actorOf(new Props(
				FileReadActor.class));

		final ActorRef remoteActor = system
				.actorFor("akka://WCMapReduceApp@127.0.0.1:2552/user/WCMapReduceActor");

		@SuppressWarnings("serial")
		ActorRef actor = system.actorOf(new Props(new UntypedActorFactory() {
			public UntypedActor create() {
				return new ClientActor(remoteActor);
			}
		}));

		fileReadActor.tell(fileName,actor);

		remoteActor.tell("DISPLAY_LIST");

		system.shutdown();

	}

	public void shutdown() {
		// TODO Auto-generated method stub

	}

	public void startup() {
		// TODO Auto-generated method stub

	}

}

package org.akka.essentials.wc.mapreduce.example.client;

/**
 * Created by admin on 2017/3/21.
 */

import akka.actor.ActorRef;
import akka.actor.UntypedActor;

public class ClientActor extends UntypedActor {

    private ActorRef remoteServer = null;
    private ActorRef fileReadActor = null;
    private long start;

    public ClientActor(ActorRef inRemoteServer){
        this.remoteServer = inRemoteServer;
    }

    @Override
    public void onReceive(Object message){
        if(message instanceof String){
            String msg = (String) message;
            remoteServer.tell(msg);
        }
    }

    @Override
    public void preStart(){
        start = System.currentTimeMillis();
    }

    @Override
    public void postStop(){
        long timeSpent = (System.currentTimeMillis() - start)/ 1000;
        System.out.println(String.format("\n\tClientActor estimate: \t\t\n\tCalculation time: \t%s Secs", timeSpent));

    }

}

package org.akka.essentials.wc.mapreduce.example.client;

/**
 * Created by admin on 2017/3/21.
 */
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;

import akka.actor.UntypedActor;

public class FileReadActor extends UntypedActor {

    @Override
    public void onReceive(Object message)throws  IOException{
        if(message instanceof String){
            String fileName = (String) message;
            try{
                BufferedReader reader = new BufferedReader(
                        new InputStreamReader(Thread.currentThread().getContextClassLoader().getResource(fileName).openStream()));

                String line = null;
                while ((line = reader.readLine())!= null){
                    System.out.println("line :");
                    System.out.println(line);

                    getSender().tell(line);
                }
                System.out.println("All lines send !");
                getSender().tell(String.valueOf("EOF"));
            }catch(IOException x){
                System.err.format("IOException: %s%n", x);
            }

        }else {
            throw new IllegalArgumentException("Unknown message [" + message + "]");
        }

    }
}

server端代码

package org.akka.essentials.wc.mapreduce.example.server;

/**
 * Created by admin on 2017/3/21.
 */

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

import akka.actor.UntypedActor;

public class AggregateActor extends UntypedActor {
    private Map<String, Integer> finalReducedMap = new HashMap<>();

    @Override
    public void onReceive(Object message)throws Exception{
        if(message instanceof Map){
            Map<String, Integer> reducedList = (Map<String, Integer>) message;
            aggregateInMemoryReduce(reducedList);
        }else if (message instanceof String){
            if(((String) message).compareTo("DISPLAY_LIST") == 0 ){
                System.out.println(finalReducedMap.toString());
            }
        }
    }

    private void aggregateInMemoryReduce(Map<String, Integer> reducedList) {
        Iterator<String> iter = reducedList.keySet().iterator();
        while (iter.hasNext()){
            String key = iter.next();
            if(finalReducedMap.containsKey(key)){
                Integer count = reducedList.get(key) + finalReducedMap.get(key);
                finalReducedMap.put(key, count);
            }else{
                finalReducedMap.put(key, reducedList.get(key));
            }
        }

    }


}


package org.akka.essentials.wc.mapreduce.example.server;

/**
 * Created by admin on 2017/3/21.
 */

import java.util.Arrays;
import java.util.ArrayList;
import java.util.List;
import java.util.StringTokenizer;

import akka.actor.ActorRef;
import akka.actor.UntypedActor;

public class MapActor extends UntypedActor {
    String[] STOP_WORDS = { "a", "about", "above", "above", "across", "after",
            "afterwards", "again", "against", "all", "almost", "alone",
            "along", "already", "also", "although", "always", "am", "among",
            "amongst", "amoungst", "amount", "an", "and", "another", "any",
            "anyhow", "anyone", "anything", "anyway", "anywhere", "are",
            "around", "as", "at", "back", "be", "became", "because", "become",
            "becomes", "becoming", "been", "before", "beforehand", "behind",
            "being", "below", "beside", "besides", "between", "beyond", "bill",
            "both", "bottom", "but", "by", "call", "can", "cannot", "cant",
            "co", "con", "could", "couldnt", "cry", "de", "describe", "detail",
            "do", "done", "down", "due", "during", "each", "eg", "eight",
            "either", "eleven", "else", "elsewhere", "empty", "enough", "etc",
            "even", "ever", "every", "everyone", "everything", "everywhere",
            "except", "few", "fifteen", "fify", "fill", "find", "fire",
            "first", "five", "for", "former", "formerly", "forty", "found",
            "four", "from", "front", "full", "further", "get", "give", "go",
            "had", "has", "hasnt", "have", "he", "hence", "her", "here",
            "hereafter", "hereby", "herein", "hereupon", "hers", "herself",
            "him", "himself", "his", "how", "however", "hundred", "ie", "if",
            "in", "inc", "indeed", "interest", "into", "is", "it", "its",
            "itself", "keep", "last", "latter", "latterly", "least", "less",
            "ltd", "made", "many", "may", "me", "meanwhile", "might", "mill",
            "mine", "more", "moreover", "most", "mostly", "move", "much",
            "must", "my", "myself", "name", "namely", "neither", "never",
            "nevertheless", "next", "nine", "no", "nobody", "none", "noone",
            "nor", "not", "nothing", "now", "nowhere", "of", "off", "often",
            "on", "once", "one", "only", "onto", "or", "other", "others",
            "otherwise", "our", "ours", "ourselves", "out", "over", "own",
            "part", "per", "perhaps", "please", "put", "rather", "re", "same",
            "see", "seem", "seemed", "seeming", "seems", "serious", "several",
            "she", "should", "show", "side", "since", "sincere", "six",
            "sixty", "so", "some", "somehow", "someone", "something",
            "sometime", "sometimes", "somewhere", "still", "such", "system",
            "take", "ten", "than", "that", "the", "their", "them",
            "themselves", "then", "thence", "there", "thereafter", "thereby",
            "therefore", "therein", "thereupon", "these", "they", "thickv",
            "thin", "third", "this", "those", "though", "three", "through",
            "throughout", "thru", "thus", "to", "together", "too", "top",
            "toward", "towards", "twelve", "twenty", "two", "un", "under",
            "until", "up", "upon", "us", "very", "via", "was", "we", "well",
            "were", "what", "whatever", "when", "whence", "whenever", "where",
            "whereafter", "whereas", "whereby", "wherein", "whereupon",
            "wherever", "whether", "which", "while", "whither", "who",
            "whoever", "whole", "whom", "whose", "why", "will", "with",
            "within", "without", "would", "yet", "you", "your", "yours",
            "yourself", "yourselves", "the" };

    List<String> STOP_WORDS_LIST = Arrays.asList(STOP_WORDS);

    private ActorRef actor = null;

    public MapActor(ActorRef inReduceActor){actor = inReduceActor;}

    private List<Result> evaluateExpression(String line){
        List<Result> list = new ArrayList<>();
        StringTokenizer parser = new StringTokenizer(line);
        while(parser.hasMoreTokens()){
            String word = parser.nextToken().toLowerCase();
            if(isAlpha(word) && !STOP_WORDS_LIST.contains(word)){
                list.add(new Result(word, 1));
            }
        }
        return list;
    }

    private boolean isAlpha(String s){
        s = s.toUpperCase();
        for(int i = 0;i < s.length();i++){
            int c = (int) s.charAt(i);
            if(c < 65|| c > 90){
                return false;
            }
        }
        return true;
    }

    @Override
    public void onReceive(Object message){
        if(message instanceof String){
            String work = (String) message;
            List<Result> list = evaluateExpression(work);
            actor.tell(list);
        }else {
            throw new IllegalArgumentException("Unknown message [" + message + "]");
        }
    }
}

package org.akka.essentials.wc.mapreduce.example.server;

/**
 * Created by admin on 2017/3/21.
 */
import java.util.Iterator;
import java.util.List;
import java.util.NavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;

import akka.actor.ActorRef;
import akka.actor.UntypedActor;

public class ReduceActor extends UntypedActor {
    private ActorRef actor = null;
    public ReduceActor(ActorRef inAggregateActor){
        actor = inAggregateActor;
    }

    @Override
    public void onReceive(Object message)throws Exception{

    }

    private NavigableMap<String , Integer> reduce(List<Result>list){
        NavigableMap<String, Integer> reducedMap = new ConcurrentSkipListMap<>();
        Iterator<Result> iter = list.iterator();
        while (iter.hasNext()){
            Result result = iter.next();
            if(reducedMap.containsKey(result.getWord())){
                Integer value = (Integer) reducedMap.get(result.getWord());
                value ++;
                reducedMap.put(result.getWord(), value);
            }else {
                reducedMap.put(result.getWord(), Integer.valueOf(1));
            }

        }
        return reducedMap;
    }


}

package org.akka.essentials.wc.mapreduce.example.server;

/**
 * Created by admin on 2017/3/21.
 */

import java.io.Serializable;

public class Result implements Serializable {
    private static final long serialVersionUID = 57L;
    private String word;
    private int no_of_instsnces;

    public Result(String word, int no_of_instsnces){
        this.setWord(word);
        this.setNoOfInstsnces(no_of_instsnces);
    }

    public String getWord(){return word;}
    public void setWord(String word){this.word = word;}
    public int getNoOfInstsnces(){return no_of_instsnces;}
    public void setNoOfInstsnces(int no_of_instsnces){this.no_of_instsnces = no_of_instsnces;}

}

package org.akka.essentials.wc.mapreduce.example.server;

/**
 * Created by admin on 2017/3/21.
 */

import akka.actor.ActorRef;
import akka.actor.UntypedActor;

public class WCMapReduceActor extends UntypedActor {
    private ActorRef mapRouter;
    private ActorRef aggregateActor;

    @Override
    public void onReceive(Object message){
        if(message instanceof String){
            if(((String) message).compareTo("DISPLAY_LIST") == 0){
                System.out.println("Got Display Message");
                aggregateActor.tell(message, getSender());
            }else {
                mapRouter.tell(message);
            }
        }
    }

    public WCMapReduceActor(ActorRef inAggregateActor, ActorRef inMapRouter){
        mapRouter = inMapRouter;
        aggregateActor = inAggregateActor;
    }

}

package org.akka.essentials.wc.mapreduce.example.server;

/**
 * Created by admin on 2017/3/21.
 */
import akka.actor.*;
import akka.dispatch.PriorityGenerator;
import akka.dispatch.UnboundedPriorityMailbox;
import akka.kernel.Bootable;
import akka.routing.RoundRobinRouter;

import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;

public class WCMapReduceServer implements Bootable {

    private ActorRef mapRouter;
    private ActorRef reduceRouter;
    private ActorRef aggregateActor;
    private ActorSystem system;

    private ActorRef wcMapReduceActor;

    public WCMapReduceServer(int no_of_reduce_workers, int no_of_map_workers){
        system = ActorSystem.create("WCMapReduceApp", ConfigFactory.load().getConfig("WCMapReduceApp"));
        aggregateActor = system.actorOf(new Props(AggregateActor.class));
        reduceRouter = system.actorOf(new Props(
                new UntypedActorFactory() {
                    public UntypedActor create() throws Exception {
                        return new ReduceActor(aggregateActor);
                    }
                }
        ).withRouter(new RoundRobinRouter(no_of_reduce_workers)));

        mapRouter = system.actorOf(new Props(
                new UntypedActorFactory() {
                    public UntypedActor create() throws Exception {
                        return new MapActor(reduceRouter);
                    }
                }
        ).withRouter(new RoundRobinRouter(no_of_map_workers)));

        wcMapReduceActor = system.actorOf(new Props(
                new UntypedActorFactory() {
                    public UntypedActor create() throws Exception {
                        return new WCMapReduceActor(aggregateActor, mapRouter);
                    }
                }
        ).withDispatcher("priorityMailBox-dispatcher"), "WCMapReduceActor");
    }

    public static void main(String[] args){
        new WCMapReduceServer(5, 5);
    }


    public void startup(){

    }

    public void shutdown(){

    }

    public static class MyPriorityMailBox extends UnboundedPriorityMailbox{

        public MyPriorityMailBox(ActorSystem.Settings settings, Config config){
            super(new PriorityGenerator() {
                @Override
                public int gen(Object message) {
                    if(message.equals("DISPLAY_LIST")){
                        return 2;
                    }else if(message.equals(PoisonPill.getInstance())){
                        return 3;
                    }else{
                        return 0;
                    }
                }
            });
        }

    }


}

配置文件

WCMapReduceApp{
 include "common"
  akka {
  	actor {
    	provider = "akka.remote.RemoteActorRefProvider"
  	}
   remote {
    transport = "akka.remote.netty.NettyRemoteTransport"
    netty {
      hostname = "127.0.0.1"
      port = 2552
    }
 	}
  }
  priorityMailBox-dispatcher {
  	mailbox-type = "org.akka.essentials.wc.mapreduce.example.server.WCMapReduceServer$MyPriorityMailBox"
  }
}

WCMapReduceClientApp{
 include "common"
  akka {
  	actor {
    	provider = "akka.remote.RemoteActorRefProvider"
  	}
  }	
}

说明:

微内核的作用是用于单独部署应用 一般另一个可选的在web端部署应用的
方法是如同一般的java web 应用一样部署在一个容器内

一般地Akka的一个相互勾连的Actors 系统可以作为独立地处理
代码逻辑的系统
而对于 两个Akka系统间的通信 一般是由 actorFor来取到 另一个
Akka系统的相关Actor 这种方式常常 应用在client server通信
中 两个群组都是Akka群组 但仍要进行通信。

在有关路径(path)的设置中 user一般是作为公共的主Actor被默认设置的。

默认情况下 akka中 进程的终止是靠message来沟通进行的
一般地 会自动调用 gracefulStop 函数 向message队列中插入
PoisonPill 实例 当识别此实例后 进行终止化程序。
故也可以利用此种特性 对于优先级进行修改
对于 PriorityGenerator类中的 int gen(Object message) 方法
返回不同的整数值 来设定 message的优先级
在本例中就实现了将打印的优先级设为在仅仅位于 收到PoisonPill之前




以上是Akka MapReduce 去停词单词统计的全部内容,在云栖社区的博客、问答、云栖号、人物、课程等栏目也有Akka MapReduce 去停词单词统计的相关内容,欢迎继续使用右上角搜索按钮进行搜索编程基础 akka ,以便于您获取更多的相关知识。

弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率

40+云计算产品,6个月免费体验

现在注册,免费体验40+云产品,及域名优惠!

云服务器9.9元/月,大学必备