DatagramSocketで遊ぶ

Socketで遊んでみる第2回目です。
Adobe AIR 2で利用できるようになったUDP通信のためのSocket、DatagramSocketを使って遊びます。

下準備

Adobe AIR 2のAPIが使えるように、SDKをマージしておきましょう。
[FLEX AIR2 Mac] Interacting with a native process のサンプルを動かす メモ

処理の流れ

UDPの通信の仕方は簡単です。

  1. 受信側がrecive()する
  2. 送信側がsend()する
  3. 使い終わったらSocketをclose()する

という感じ。コネクションの確立などは気にする必要がありません。(まあUDPってそういう物だし。)

受信側がrecive()する

次のようにUDPのパケットが来るのを待ちます。

var port:int = 22222;
var address:String = "0.0.0.0";
var datagramSocket:DatagramSocket = new DatagramSocket();
datagramScoket.bind(port, address);
datagramScoket.recive();

この時、DatagramSocketにDatagramSocketDataEvent.DATAに対するイベントリスナを追加して、受信したデータを調べる事ができます。

例えば、次のようにイベントリスナを追加した場合、

//イベントリスナを登録
datagramSocket.addEventListener(DatagramSocketDataEvent.DATA, dataEventHandler);

以下のようにして受信したデータを調べる事ができます。

/**
 * DatagramDataEvent.DATAイベントを受け取った際に呼ばれるメソッド
 */
function dataEventHandler(event:DatagramSocketDataEvent):void{
  var byteArray:ByteArray = event.data;
  trace(byteArray.length + " bytes recived...");
}
送信側がsend()する

次のようにデータを送信します。

//送るデータ
var bytes:ByteArray = new ByteArray();
bytes.writeMultiByte("test message", "UTF-8");
//送信先アドレス
var address:String = "127.0.0.1";
//送信先ポート
var port:int = 22222;
datagramSocket.send(bytes, 0, 0, address, port);

この時、送信するバイト列の長さがMTUを越えないようにします。

使い終わったらSocketをclose()する

使い終わったらSocketを閉じます。

try{
  datagramSocket.close();
}catch(error:Error){
  trace(error.getStatckTrace();
}

サンプルソース

以下、DatagramSocketを使った受信側、送信側の両方を実現するUdpServer.asです。
実際は上記以外にいろいろ余分な事をしています。(UDPの伝送遅延を測るのが目的のソースなもので。)

UdpServer.as
package org.mineap.AIRSocketTest.udp
{
	import flash.events.DatagramSocketDataEvent;
	import flash.events.Event;
	import flash.events.IOErrorEvent;
	import flash.filesystem.File;
	import flash.filesystem.FileMode;
	import flash.filesystem.FileStream;
	import flash.net.DatagramSocket;
	import flash.utils.ByteArray;
	
	import mx.charts.chartClasses.DataDescription;
	
	import org.mineap.AIRSocketTest.util.Logger;

	/**
	 * 
	 * @author shiraminekeisuke(MineAP)
	 * 
	 */
	public class UdpServer
	{
		private var _address:String;
		
		private var _port:int;
		
		private var _file:File;
		
		private var _bytes:ByteArray;
		
		private var _logger:Logger = Logger.instance;
		
		private var _datagramSocket:DatagramSocket;
		
		private var _datagramScoketForRecive:DatagramSocket;
		
		private var _datagramSocketForAct:DatagramSocket;
		
		private var _progressCount:Number = 0;
		
		private var _totalBytes:Number = 0;
		
		private var _fileStream:FileStream;
		
		private const EOF:String = "EoF";
		
		private const ACT:String = "Act";
		
		private var _eof_bytes:ByteArray;
	
		private var _act_bytes:ByteArray;
		
		private var _startTime:Date;
		
		private var _firstDatagramSent:Date;
		
		/**
		 * 
		 * 
		 */
		public function UdpServer()
		{
			this._eof_bytes = new ByteArray();
			this._eof_bytes.writeMultiByte(EOF, "UTF-8");
			
			this._act_bytes = new ByteArray();
			this._act_bytes.writeMultiByte(ACT, "UTF-8");
		}
		
		/**
		 * 送信するファイルを設定します
		 * @param file
		 * 
		 */
		public function set data(file:File):void{
			this._file = file;
		}
		
		/**
		 * 指定されたアドレスおよびポートにUDPでデータを送信します
		 * 
		 * @param address
		 * @param port
		 * 
		 */
		public function sent(address:String, port:int):void{
			
			this._bytes = new ByteArray;
			
			if(this._file != null){
			
				_logger.put("送信準備(ファイルの読み込み:" + this._file.nativePath + ")");
				
				this._fileStream = new FileStream();
				this._fileStream.addEventListener(IOErrorEvent.IO_ERROR, ioErrorEventHandler);
				this._fileStream.open(this._file, FileMode.READ);
				this._fileStream.readBytes(this._bytes);
			}else{
				_logger.put("ファイルが指定されていません。");
				
				return;
			}
			
			_logger.put("ファイルの読み込み完了:" + this._bytes.length + " bytes");
			
			this._datagramSocket = new DatagramSocket();
			
			try{
				
				this._datagramSocket.addEventListener(IOErrorEvent.IO_ERROR, ioErrorEventHandler);
				
				this._logger.put("送信開始:" + address + ", " + port);
				
				var mtu:uint = 1400;
				var offset:uint = 0;
				var len:uint = mtu;
				var count:uint = 1;
				
				// ACTの受け取り用
				this._firstDatagramSent = new Date();
				this.recive(port+1, "0.0.0.0");
				
				for(; this._bytes.length >= len*count; count++){
					
					// 1400 bytesごとに送信
					this._datagramSocket.send(this._bytes, offset, len, address, port);
					offset = count * len;
				}
				
				// あまりを送信
				len = this._bytes.length % mtu;
				if(len > 0){
					this._datagramSocket.send(this._bytes, offset, len, address, port);
				}
				
				this._datagramSocket.send(this._eof_bytes, 0, 0, address, port);
				
			}catch(error:Error){
				trace(error.getStackTrace());
				this._logger.put(error);
				try{
					this._datagramSocket.close();
				}catch(error:Error){
				}
			}
			
		}
		
		/**
		 * 指定されたポートでUDPの通信を待ち受けます
		 * 
		 * @param port
		 * @return 
		 * 
		 */
		public function recive(port:int, localAddress:String):void{
			
			this._port = port;
			this._address = localAddress;
			
			this._progressCount = 0;
			this._totalBytes = 0;
			
			this._datagramScoketForRecive = new DatagramSocket();
			
			try{

				this._datagramScoketForRecive.addEventListener(DatagramSocketDataEvent.DATA, dataEventHandler);
				this._datagramScoketForRecive.addEventListener(Event.CLOSE, closeEvnetHandler);
				this._datagramScoketForRecive.addEventListener(IOErrorEvent.IO_ERROR, ioErrorEventHandler);
				
				_logger.put("ポートをバインド:" + port);
				this._datagramScoketForRecive.bind(this._port, this._address);
				
				_logger.put("通信待ち受け開始");
				this._datagramScoketForRecive.receive();
				
			}catch(error:Error){
				this._logger.put(error);
				try{
					this._datagramScoketForRecive.close();
				}catch(error:Error){
				}
			}
			
		}
		
		/**
		 * 
		 * @param error
		 * 
		 */
		protected function closeEvnetHandler(event:Event):void{
			
			_logger.put("Total " + this._totalBytes + " bytes received.");
			trace(event);
		}
		
		/**
		 * 
		 * @param error
		 * 
		 */
		protected function ioErrorEventHandler(error:IOErrorEvent):void{
			_logger.put(error);
			this.close();
		}
		
		/**
		 * 
		 * @param event
		 * 
		 */
		protected function dataEventHandler(event:DatagramSocketDataEvent):void{
			
			if(this._progressCount == 0){
				this._startTime = new Date();
				
				if(this._firstDatagramSent == null){
					// 送信側でなければACTを返却
					this.sentAct(event.srcAddress, event.dstPort + 1);
				}else{
					
					var time:String = ((this._startTime.getTime() - this._firstDatagramSent.getTime())/(2000.0)) + " sec.";
					this._logger.status = time + " (UDP)";
					this._logger.put("通信遅延時間(片道): " + time);
					this._firstDatagramSent = null;
					return;
				}
			}
			
			this._progressCount++;
			this._totalBytes += event.data.length;
			
			if(this._progressCount % 1000 == 1){
				this._logger.put(this._totalBytes + " bytes received...");
			}
			
			var end:Boolean = true;
			if(event.data.length == _eof_bytes.length){
				for(var i:int = 0; _eof_bytes.length > i; i++){
					if(_eof_bytes[i] != event.data[i]){
						end = false;
						break;
					}
				}
			}else{
				end = false;
			}
			
			if(end){
				var endTime:Date = new Date();
				this._logger.put("EOFを受信:" + this._totalBytes + " bytes recived.");
				this._logger.put("経過時間:" + (endTime.getTime() - this._startTime.getTime())/1000.0 + " sec.");
			}
			
		}
		
		/**
		 * 指定されたServerに対して"ACT"を送信します。
		 * 
		 * @param server
		 * @param port
		 * 
		 */
		public function sentAct(server:String, port:int):void{
			
			this._datagramSocketForAct = new DatagramSocket();
			
			try{
				this._datagramSocketForAct.send(this._act_bytes, 0, 0, server, port);
				this._logger.put("ACTを返信:" + server + ", " + port);
			}catch(error:Error){
				trace(error.getStackTrace());
				try{
					this._datagramSocketForAct.close();
				}catch(error:Error){
					
				}
			}
		}
		
		/**
		 * 
		 * 
		 */
		public function close():void{
			
			try{
				this._fileStream.close();
				_logger.put("ローカルファイル読み込みストリームをクローズ");
			}catch(error:Error){
			}
			this._fileStream = null;
			
			try{
				this._datagramSocket.close();
				_logger.put("送信用ソケットをクローズ");
			}catch(error:Error){
			}
			this._datagramSocket = null;
			
			try{
				this._datagramSocket.close();
				_logger.put("受信用ソケットをクローズ");
			}catch(error:Error){
			}
			this._datagramScoketForRecive = null;
			
			
			try{
				this._datagramSocketForAct.close();
			}catch(error:Error){
			}
			this._datagramSocketForAct = null;
			
		}
		
	}
}

以上、DatagramSocketを使って遊んでみる、でした。