例題04b ベースプロセス間のストリーム受け渡し

(1)概要

エージェントが持つベースプロセス間でストリームを受け渡す方法を示す。
(注意:ネームサーバNameServer.jarの起動が必要。)

(2)エージェント

エージェント名ファイル名説明
TimeSenderTimeSender.dashストリームを送信するエージェント
TimeSender.class
ベースプロセス
TimeReceiverTimeReceiver.dashストリームを受信するエージェント
TimeReceiver.class
ベースプロセス

(2.1)TimeSender

●知識記述ファイル (TimeSender.dash)
(agent TimeSender
 
 (property
  (create :author "yu" :modifiedBy "taka")
 )
 
 (initial_facts
 )
 
 (knowledge
    (rule startup
      (Msg :performative __INIT_I)
      (Status :name ?name)
      -->
      (inspect nonstop)
      (loadBP baseProcess.dashSample.TimeSender)  // BP読み込み
      (startBP)                          // run()呼び出し
      (control startup ())               // startup()呼び出し
      (control setWindowTitle(?name))    // setWindowTitle()呼び出し
      (bind ?t (current-time))
      //キーとなるファクトを作成
      (make (key :name ?name :time ?t))
    )
    
    //パイプの作成とストリーム送信開始
    (rule sendStream
      (key :name ?name :time ?t) = ?arg
      -->
      (bind ?key (toString ?arg))
      //キーを用いてストリーム送受信用のパイプを作成し、
      //ストリーム送信開始
      (control startOutputStream(?key))
    )
    
    //キー要求を受け取り、キーを送る
    (rule receive-keyRequest
      (Msg :performative keyRequest :from ?sender ) = ?msg
      (key :name ?name :time ?t) = ?arg
      -->
      (bind ?key (toString ?arg))
      (send :performative sendKey :to ?sender :content (?key))
      (remove ?msg)
    )
  )
)

●ベースプロセス (TimeSender.java)
package baseProcess.dashSample;

import java.util.*;
import java.awt.*;
import javax.swing.*;
import javax.swing.border.*;
import dash.*;
import java.io.*;

public class TimeSender extends JFrame implements DashBP {

	private DashAgent agent;

	JTextArea textarea;
	String key;
	BufferedWriter out;
	OutputStream os = null;
	//ループ用フラグ
	boolean flag = true;
	//同期用オブジェクト
	Object obj = new Object();

	/** コンストラクタ。アクション(loadBP)実行時に呼ばれる。 */
	public TimeSender() {
		super("TimeSender");
	}

	/** アクション(loadBP)実行時に呼ばれる。*/
	public void setAgent(DashAgent agent) {
		this.agent = agent;
	}

	/** アクション(startBP)実行時に呼ばれる */
	public void run() {
		//出力ストリーム取得まで待つ
		synchronized (obj) {
			try {
				obj.wait();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		//出力ストリーム取得後の処理
		out = new BufferedWriter(new OutputStreamWriter(os));
		while (flag) {
			Date now = new Date();
			textarea.setText(now.toString());
			try {
				out.write(now.toString() + "\n");
				out.flush();
			} catch (IOException e) {
				e.printStackTrace();
			}
			try {
				Thread.sleep(1500);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}

	/** エージェントの終了時に呼ばれる */
	public void finalizeBP() {
		flag = false;
		if (out != null) {
			//Pipeの切断処理を実行
			agent.pm.close(key);
		}
		dispose();
	}

	/** ルールstartup内で呼び出される */
	public void startup() {
		setup();
		// ときどきnullなことがある
		while (textarea == null)
			try {
				Thread.sleep(500);
			} catch (InterruptedException e) {
			}
		textarea.append("起動\n");
	}

	/** ルールstartup内で呼び出される */
	public void setWindowTitle(Object[] str1) {
		setTitle("sWin:" + str1[0]);
	}

	//Pipe作成、ストリーム送信開始
	public void startOutputStream(Object[] str) {
		key = (String) str[0];
		// キーを用いてPipeの作成処理を実行
		agent.pm.create(key);

		synchronized (obj) {
			// キーを用いて出力ストリームを取得
			os = agent.pm.getOutputStream(key);
			obj.notify();
		}
	}

	/** ウィンドウを作る */
	private void setup() {
		textarea = new JTextArea(6, 20);
		JScrollPane sc = new JScrollPane(textarea);
		sc.setBorder(new TitledBorder("Output"));
		getContentPane().add(sc, BorderLayout.CENTER);
		pack();
		setVisible(true);
	}
}

(2.2)TimeReceiver

●知識記述ファイル (TimeReceiver.dash)
(agent TimeReceiver
 
 (property
  (create :author "yu" :modifiedBy "taka")
 )
 
 (initial_facts
 )
 
  (knowledge
    (rule startup
      (Msg :performative __INIT_I)
      (Status :name ?name)
      -->
      (inspect nonstop)
      (loadBP baseProcess.dashSample.TimeReceiver)  // BP読み込み
      (startBP)                          // run()呼び出し
      (control startup ())
    )
 
    // ベースプロセスからのイベントを受け取り、
    // TimeSenderにキーを要求する
    (rule event-handling
      (Msg :performative __Event :content (?string)) = ?msg
      -->
      (bind ?agents (lookup :rname TimeSender))
      (bind ?agent (shift ?agents))
      (send :performative keyRequest :to ?agent:name :content ())
      (remove ?msg)
     )
    
    // TimeSenderから、キーを受け取る
    (rule receive-sendKey
     (Msg :performative sendKey :from ?sender :content (?key)) = ?msg
     -->
     //キーを用いてストリーム受信を開始
     (control startInputStream(?key))
     (remove ?msg)
    )
  )
)

●ベースプロセス (TimeReceiver.java)
package baseProcess.dashSample;

import java.awt.*;
import java.awt.event.*;
import javax.swing.*;
import javax.swing.border.*;
import dash.*;
import java.io.*;

public class TimeReceiver extends JFrame implements DashBP, ActionListener {

	private DashAgent agent;

	JTextArea textarea;
	JButton startbutton;
	JButton stopbutton;
	BufferedReader in;
	InputStream is = null;
	//ループ用フラグ
	boolean flag = true;
	//同期用オブジェクト
	Object obj = new Object();

	/** コンストラクタ。アクション(loadBP)実行時に呼ばれる。 */
	public TimeReceiver() {
		super("TimeReceiver");
	}

	/** アクション(loadBP)実行時に呼ばれる。*/
	public void setAgent(DashAgent agent) {
		this.agent = agent;
	}

	/** アクション(startBP)実行時に呼ばれる
	 * 
	 * */
	public void run() {
		//入力ストリーム取得まで待つ	
		synchronized (obj) {
			try {
				obj.wait();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		//入力ストリーム取得後の処理
		in = new BufferedReader(new InputStreamReader(is));
		try {
			while (flag) {
				String ss = in.readLine();
				if (ss != null)
					textarea.append(ss + "\n");
			}
		} catch (IOException e) {
			finalizeBP();
			//System.out.println("Pipeの接続が切れました");
		}
	}

	//ストリーム受信開始
	public void startInputStream(Object[] str) {
		synchronized (obj) {
			//入力ストリームを取得
			is = agent.pm.getInputStream((String) str[0]);
			obj.notify();
		}
	}

	/** エージェントの終了時に呼ばれる */
	public void finalizeBP() {
		flag = false;
		if (in != null) {
			try {
				in.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
		dispose();
	}

	/** startボタンを押し、エージェントにイベントを上げる */
	public void actionPerformed(ActionEvent ae) {
		//System.out.println(((JButton)ae.getSource()).getText());
		if (agent != null) {
			if (((JButton) ae.getSource()).getText().equals("Start")) {
				agent.raiseEvent("start");
			} else if (((JButton) ae.getSource()).getText().equals("Finish")) {
				dispose();
			}
		}
	}

	/** ルールstartup内で呼び出される */
	public void startup() {
		setup();
		// ときどきnullなことがある
		while (textarea == null)
			try {
				Thread.sleep(500);
			} catch (InterruptedException e) {
			}
		textarea.append("起動\n");
	}

	/** ウィンドウを作る */
	private void setup() {
		textarea = new JTextArea(6, 20);
		JScrollPane sc = new JScrollPane(textarea);
		sc.setBorder(new TitledBorder("Input"));

		startbutton = new JButton("Start");
		startbutton.addActionListener(this);

		stopbutton = new JButton("Finish");
		stopbutton.addActionListener(this);

		JPanel panel = new JPanel();
		panel.add(startbutton);
		panel.add(stopbutton);

		GridLayout layout = new GridLayout(1, 2);
		panel.setLayout(layout);

		getContentPane().add(sc, BorderLayout.CENTER);
		getContentPane().add(panel, BorderLayout.SOUTH);

		pack();
		setVisible(true);
	}
}

(3)処理の流れ


TimeSenderエージェントTimeReceiverエージェントを起動する

TimeSenderエージェントは、キーとなるファクトを生成し、そのキーを用いてストリーム送受信用のパイプを作成する( agent.pm.create(key)

TimeSenderエージェントは、キーを用いて出力ストリームを取得( os = agent.pm.getOutputStream(key) )し、ストリームを流す


TimeReceiverエージェントは、ベースプロセスから受信開始のイベントを受け取ると、TimeSenderエージェントに対しキー要求メッセージを投げ、キーをもらう

TimeReceiverエージェントは、キーをもとに入力ストリームを取得( is = agent.pm.getInputStream(key) )し、ストリームを受信する