1. 云栖社区>
  2. >
  3. 正文

Akka MapReduce 去停词单词统计

作者:用户 来源:互联网 时间:2018-09-04 19:23:57

编程基础akka

Akka MapReduce 去停词单词统计 - 摘要: 本文讲的是Akka MapReduce 去停词单词统计, 代码转载自 https://github.com/write2munish/Akka-Essentials WordCountMapReduce WordCountMapReduce : This examples

代码转载自

https://github.com/write2munish/Akka-Essentials

WordCountMapReduce


WordCountMapReduce : This examples implements the Word Count Map Reduce model. The client system reads a text file and sends each line of text as a message to the Server. The server reads the line, maps the words, reduces the words and finally does an inmemory aggregation of the result. The example also implemented a prioritymailbox, which is used to segregate the message requests between the mapreduce requests and getting the list of results from the aggregate actor


client端代码

package org.akka.essentials.wc.mapreduce.example.client;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.actor.UntypedActorFactory;
import akka.kernel.Bootable;

import com.typesafe.config.ConfigFactory;

public class Client implements Bootable {

	/**
	 * @param args
	 */
	public static void main(String[] args) {

		final String fileName = "Othello.txt";

		ActorSystem system = ActorSystem.create("ClientApplication",
				ConfigFactory.load().getConfig("WCMapReduceClientApp"));

		final ActorRef fileReadActor = system.actorOf(new Props(
				FileReadActor.class));

		final ActorRef remoteActor = system
				.actorFor("akka://WCMapReduceApp@127.0.0.1:2552/user/WCMapReduceActor");

		@SuppressWarnings("serial")
		ActorRef actor = system.actorOf(new Props(new UntypedActorFactory() {
			public UntypedActor create() {
				return new ClientActor(remoteActor);
			}
		}));

		fileReadActor.tell(fileName,actor);

		remoteActor.tell("DISPLAY_LIST");

		system.shutdown();

	}

	public void shutdown() {
		// TODO Auto-generated method stub

	}

	public void startup() {
		// TODO Auto-generated method stub

	}

}

package org.akka.essentials.wc.mapreduce.example.client;

/**
 * Created by admin on 2017/3/21.
 */

import akka.actor.ActorRef;
import akka.actor.UntypedActor;

public class ClientActor extends UntypedActor {

    private ActorRef remoteServer = null;
    private ActorRef fileReadActor = null;
    private long start;

    public ClientActor(ActorRef inRemoteServer){
        this.remoteServer = inRemoteServer;
    }

    @Override
    public void onReceive(Object message){
        if(message instanceof String){
            String msg = (String) message;
            remoteServer.tell(msg);
        }
    }

    @Override
    public void preStart(){
        start = System.currentTimeMillis();
    }

    @Override
    public void postStop(){
        long timeSpent = (System.currentTimeMillis() - start)/ 1000;
        System.out.println(String.format("\n\tClientActor estimate: \t\t\n\tCalculation time: \t%s Secs", timeSpent));

    }

}

package org.akka.essentials.wc.mapreduce.example.client;

/**
 * Created by admin on 2017/3/21.
 */
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;

import akka.actor.UntypedActor;

public class FileReadActor extends UntypedActor {

    @Override
    public void onReceive(Object message)throws  IOException{
        if(message instanceof String){
            String fileName = (String) message;
            try{
                BufferedReader reader = new BufferedReader(
                        new InputStreamReader(Thread.currentThread().getContextClassLoader().getResource(fileName).openStream()));

                String line = null;
                while ((line = reader.readLine())!= null){
                    System.out.println("line :");
                    System.out.println(line);

                    getSender().tell(line);
                }
                System.out.println("All lines send !");
                getSender().tell(String.valueOf("EOF"));
            }catch(IOException x){
                System.err.format("IOException: %s%n", x);
            }

        }else {
            throw new IllegalArgumentException("Unknown message [" + message + "]");
        }

    }
}

server端代码

package org.akka.essentials.wc.mapreduce.example.server;

/**
 * Created by admin on 2017/3/21.
 */

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

import akka.actor.UntypedActor;

public class AggregateActor extends UntypedActor {
    private Map<String, Integer> finalReducedMap = new HashMap<>();

    @Override
    public void onReceive(Object message)throws Exception{
        if(message instanceof Map){
            Map<String, Integer> reducedList = (Map<String, Integer>) message;
            aggregateInMemoryReduce(reducedList);
        }else if (message instanceof String){
            if(((String) message).compareTo("DISPLAY_LIST") == 0 ){
                System.out.println(finalReducedMap.toString());
            }
        }
    }

    private void aggregateInMemoryReduce(Map<String, Integer> reducedList) {
        Iterator<String> iter = reducedList.keySet().iterator();
        while (iter.hasNext()){
            String key = iter.next();
            if(finalReducedMap.containsKey(key)){
                Integer count = reducedList.get(key) + finalReducedMap.get(key);
                finalReducedMap.put(key, count);
            }else{
                finalReducedMap.put(key, reducedList.get(key));
            }
        }

    }


}


package org.akka.essentials.wc.mapreduce.example.server;

/**
 * Created by admin on 2017/3/21.
 */

import java.util.Arrays;
import java.util.ArrayList;
import java.util.List;
import java.util.StringTokenizer;

import akka.actor.ActorRef;
import akka.actor.UntypedActor;

public class MapActor extends UntypedActor {
    String[] STOP_WORDS = { "a", "about", "above", "above", "across", "after",
            "afterwards", "again", "against", "all", "almost", "alone",
            "along", "already", "also", "although", "always", "am", "among",
            "amongst", "amoungst", "amount", "an", "and", "another", "any",
            "anyhow", "anyone", "anything", "anyway", "anywhere", "are",
            "around", "as", "at", "back", "be", "became", "because", "become",
            "becomes", "becoming", "been", "before", "beforehand", "behind",
            "being", "below", "beside", "besides", "between", "beyond", "bill",
            "both", "bottom", "but", "by", "call", "can", "cannot", "cant",
            "co", "con", "could", "couldnt", "cry", "de", "describe", "detail",
            "do", "done", "down", "due", "during", "each", "eg", "eight",
            "either", "eleven", "else", "elsewhere", "empty", "enough", "etc",
            "even", "ever", "every", "everyone", "everything", "everywhere",
            "except", "few", "fifteen", "fify", "fill", "find", "fire",
            "first", "five", "for", "former", "formerly", "forty", "found",
            "four", "from", "front", "full", "further", "get", "give", "go",
            "had", "has", "hasnt", "have", "he", "hence", "her", "here",
            "hereafter", "hereby", "herein", "hereupon", "hers", "herself",
            "him", "himself", "his", "how", "however", "hundred", "ie", "if",
            "in", "inc", "indeed", "interest", "into", "is", "it", "its",
            "itself", "keep", "last", "latter", "latterly", "least", "less",
            "ltd", "made", "many", "may", "me", "meanwhile", "might", "mill",
            "mine", "more", "moreover", "most", "mostly", "move", "much",
            "must", "my", "myself", "name", "namely", "neither", "never",
            "nevertheless", "next", "nine", "no", "nobody", "none", "noone",
            "nor", "not", "nothing", "now", "nowhere", "of", "off", "often",
            "on", "once", "one", "only", "onto", "or", "other", "others",
            "otherwise", "our", "ours", "ourselves", "out", "over", "own",
            "part", "per", "perhaps", "please", "put", "rather", "re", "same",
            "see", "seem", "seemed", "seeming", "seems", "serious", "several",
            "she", "should", "show", "side", "since", "sincere", "six",
            "sixty", "so", "some", "somehow", "someone", "something",
            "sometime", "sometimes", "somewhere", "still", "such", "system",
            "take", "ten", "than", "that", "the", "their", "them",
            "themselves", "then", "thence", "there", "thereafter", "thereby",
            "therefore", "therein", "thereupon", "these", "they", "thickv",
            "thin", "third", "this", "those", "though", "three", "through",
            "throughout", "thru", "thus", "to", "together", "too", "top",
            "toward", "towards", "twelve", "twenty", "two", "un", "under",
            "until", "up", "upon", "us", "very", "via", "was", "we", "well",
            "were", "what", "whatever", "when", "whence", "whenever", "where",
            "whereafter", "whereas", "whereby", "wherein", "whereupon",
            "wherever", "whether", "which", "while", "whither", "who",
            "whoever", "whole", "whom", "whose", "why", "will", "with",
            "within", "without", "would", "yet", "you", "your", "yours",
            "yourself", "yourselves", "the" };

    List<String> STOP_WORDS_LIST = Arrays.asList(STOP_WORDS);

    private ActorRef actor = null;

    public MapActor(ActorRef inReduceActor){actor = inReduceActor;}

    private List<Result> evaluateExpression(String line){
        List<Result> list = new ArrayList<>();
        StringTokenizer parser = new StringTokenizer(line);
        while(parser.hasMoreTokens()){
            String word = parser.nextToken().toLowerCase();
            if(isAlpha(word) && !STOP_WORDS_LIST.contains(word)){
                list.add(new Result(word, 1));
            }
        }
        return list;
    }

    private boolean isAlpha(String s){
        s = s.toUpperCase();
        for(int i = 0;i < s.length();i++){
            int c = (int) s.charAt(i);
            if(c < 65|| c > 90){
                return false;
            }
        }
        return true;
    }

    @Override
    public void onReceive(Object message){
        if(message instanceof String){
            String work = (String) message;
            List<Result> list = evaluateExpression(work);
            actor.tell(list);
        }else {
            throw new IllegalArgumentException("Unknown message [" + message + "]");
        }
    }
}

package org.akka.essentials.wc.mapreduce.example.server;

/**
 * Created by admin on 2017/3/21.
 */
import java.util.Iterator;
import java.util.List;
import java.util.NavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;

import akka.actor.ActorRef;
import akka.actor.UntypedActor;

public class ReduceActor extends UntypedActor {
    private ActorRef actor = null;
    public ReduceActor(ActorRef inAggregateActor){
        actor = inAggregateActor;
    }

    @Override
    public void onReceive(Object message)throws Exception{

    }

    private NavigableMap<String , Integer> reduce(List<Result>list){
        NavigableMap<String, Integer> reducedMap = new ConcurrentSkipListMap<>();
        Iterator<Result> iter = list.iterator();
        while (iter.hasNext()){
            Result result = iter.next();
            if(reducedMap.containsKey(result.getWord())){
                Integer value = (Integer) reducedMap.get(result.getWord());
                value ++;
                reducedMap.put(result.getWord(), value);
            }else {
                reducedMap.put(result.getWord(), Integer.valueOf(1));
            }

        }
        return reducedMap;
    }


}

package org.akka.essentials.wc.mapreduce.example.server;

/**
 * Created by admin on 2017/3/21.
 */

import java.io.Serializable;

public class Result implements Serializable {
    private static final long serialVersionUID = 57L;
    private String word;
    private int no_of_instsnces;

    public Result(String word, int no_of_instsnces){
        this.setWord(word);
        this.setNoOfInstsnces(no_of_instsnces);
    }

    public String getWord(){return word;}
    public void setWord(String word){this.word = word;}
    public int getNoOfInstsnces(){return no_of_instsnces;}
    public void setNoOfInstsnces(int no_of_instsnces){this.no_of_instsnces = no_of_instsnces;}

}

package org.akka.essentials.wc.mapreduce.example.server;

/**
 * Created by admin on 2017/3/21.
 */

import akka.actor.ActorRef;
import akka.actor.UntypedActor;

public class WCMapReduceActor extends UntypedActor {
    private ActorRef mapRouter;
    private ActorRef aggregateActor;

    @Override
    public void onReceive(Object message){
        if(message instanceof String){
            if(((String) message).compareTo("DISPLAY_LIST") == 0){
                System.out.println("Got Display Message");
                aggregateActor.tell(message, getSender());
            }else {
                mapRouter.tell(message);
            }
        }
    }

    public WCMapReduceActor(ActorRef inAggregateActor, ActorRef inMapRouter){
        mapRouter = inMapRouter;
        aggregateActor = inAggregateActor;
    }

}

package org.akka.essentials.wc.mapreduce.example.server;

/**
 * Created by admin on 2017/3/21.
 */
import akka.actor.*;
import akka.dispatch.PriorityGenerator;
import akka.dispatch.UnboundedPriorityMailbox;
import akka.kernel.Bootable;
import akka.routing.RoundRobinRouter;

import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;

public class WCMapReduceServer implements Bootable {

    private ActorRef mapRouter;
    private ActorRef reduceRouter;
    private ActorRef aggregateActor;
    private ActorSystem system;

    private ActorRef wcMapReduceActor;

    public WCMapReduceServer(int no_of_reduce_workers, int no_of_map_workers){
        system = ActorSystem.create("WCMapReduceApp", ConfigFactory.load().getConfig("WCMapReduceApp"));
        aggregateActor = system.actorOf(new Props(AggregateActor.class));
        reduceRouter = system.actorOf(new Props(
                new UntypedActorFactory() {
                    public UntypedActor create() throws Exception {
                        return new ReduceActor(aggregateActor);
                    }
                }
        ).withRouter(new RoundRobinRouter(no_of_reduce_workers)));

        mapRouter = system.actorOf(new Props(
                new UntypedActorFactory() {
                    public UntypedActor create() throws Exception {
                        return new MapActor(reduceRouter);
                    }
                }
        ).withRouter(new RoundRobinRouter(no_of_map_workers)));

        wcMapReduceActor = system.actorOf(new Props(
                new UntypedActorFactory() {
                    public UntypedActor create() throws Exception {
                        return new WCMapReduceActor(aggregateActor, mapRouter);
                    }
                }
        ).withDispatcher("priorityMailBox-dispatcher"), "WCMapReduceActor");
    }

    public static void main(String[] args){
        new WCMapReduceServer(5, 5);
    }


    public void startup(){

    }

    public void shutdown(){

    }

    public static class MyPriorityMailBox extends UnboundedPriorityMailbox{

        public MyPriorityMailBox(ActorSystem.Settings settings, Config config){
            super(new PriorityGenerator() {
                @Override
                public int gen(Object message) {
                    if(message.equals("DISPLAY_LIST")){
                        return 2;
                    }else if(message.equals(PoisonPill.getInstance())){
                        return 3;
                    }else{
                        return 0;
                    }
                }
            });
        }

    }


}

配置文件

WCMapReduceApp{
 include "common"
  akka {
  	actor {
    	provider = "akka.remote.RemoteActorRefProvider"
  	}
   remote {
    transport = "akka.remote.netty.NettyRemoteTransport"
    netty {
      hostname = "127.0.0.1"
      port = 2552
    }
 	}
  }
  priorityMailBox-dispatcher {
  	mailbox-type = "org.akka.essentials.wc.mapreduce.example.server.WCMapReduceServer$MyPriorityMailBox"
  }
}

WCMapReduceClientApp{
 include "common"
  akka {
  	actor {
    	provider = "akka.remote.RemoteActorRefProvider"
  	}
  }	
}

说明:

微内核的作用是用于单独部署应用 一般另一个可选的在web端部署应用的
方法是如同一般的java web 应用一样部署在一个容器内

一般地Akka的一个相互勾连的Actors 系统可以作为独立地处理
代码逻辑的系统
而对于 两个Akka系统间的通信 一般是由 actorFor来取到 另一个
Akka系统的相关Actor 这种方式常常 应用在client server通信
中 两个群组都是Akka群组 但仍要进行通信。

在有关路径(path)的设置中 user一般是作为公共的主Actor被默认设置的。

默认情况下 akka中 进程的终止是靠message来沟通进行的
一般地 会自动调用 gracefulStop 函数 向message队列中插入
PoisonPill 实例 当识别此实例后 进行终止化程序。
故也可以利用此种特性 对于优先级进行修改
对于 PriorityGenerator类中的 int gen(Object message) 方法
返回不同的整数值 来设定 message的优先级
在本例中就实现了将打印的优先级设为在仅仅位于 收到PoisonPill之前




以上是云栖社区小编为您精心准备的的内容,在云栖社区的博客、问答、公众号、人物、课程等栏目也有的相关内容,欢迎继续使用右上角搜索按钮进行搜索编程基础 akka ,以便于您获取更多的相关知识。

弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率

40+云计算产品,6个月免费体验

现在注册,免费体验40+云产品,及域名优惠!

云服务器9.9元/月,大学必备