Java_io体系之PipedWriter、PipedReader简介、走进源码及示例——14-程序员宅基地

技术标签: java  

Java_io体系之PipedWriter、PipedReader简介、走进源码及示例——14


——管道字符输出流、必须建立在管道输入流之上、所以先介绍管道字符输出流。可以先看示例或者总结、总结写的有点Q、不喜可无视、有误的地方指出则不胜感激。


一:PipedWriter


1、类功能简介:


管道字符输出流、用于将当前线程的指定字符写入到与此线程对应的管道字符输入流中去、所以PipedReader(pr)、PipedWriter(pw)必须配套使用、缺一不可。管道字符输出流的本质就是调用pr中的方法将字符或者字符数组写入到pr中、这一点是与众不同的地方。所以pw中的方法很少也很简单、主要就是负责将传入的pr与本身绑定、配对使用、然后就是调用绑定的pr的写入方法、将字符或者字符数组写入到pr的缓存字符数组中。


2、PipedWriter API简介:


A:关键字

    private PipedReader sink;	与此PipedWriter绑定的PipedReader

    
    private boolean closed = false;		标示此流是否关闭。

B:构造方法

	PipedWriter(PipedReader snk)	根据传入的PipedReader构造pw、并将pr与此pw绑定
    
    PipedWriter()	创建一个pw、在使用之前必须与一个pr绑定



C:一般方法

	synchronized void connect(PipedReader snk)		将此pw与一个pr绑定
	
	void close()	关闭此流。
	
	synchronized void connect(PipedReader snk)		将此pw与一个pr绑定
	
	synchronized void flush()	flush此流、唤醒pr中所有等待的方法。
	
	void write(int c)	将一个整数写入到与此pw绑定的pr的缓存字符数组buf中去
	
	void write(char cbuf[], int off, int len)	将cbuf的一部分写入pr的buf中去


3、源码分析


package com.chy.io.original.code;

import java.io.IOException;

public class PipedWriter extends Writer {
	
	//与此PipedWriter绑定的PipedReader
    private PipedReader sink;

    //标示此流是否关闭。
    private boolean closed = false;

    /**
     * 根据传入的PipedReader构造pw、并将pr与此pw绑定
     */
    public PipedWriter(PipedReader snk)  throws IOException {
    	connect(snk);
    }
    
    /**
     * 创建一个pw、在使用之前必须与一个pr绑定
     */
    public PipedWriter() {
    }
    
    /**
     * 将此pw与一个pr绑定
     */
    public synchronized void connect(PipedReader snk) throws IOException {
        if (snk == null) {
            throw new NullPointerException();
        } else if (sink != null || snk.connected) {
		    throw new IOException("Already connected");
		} else if (snk.closedByReader || closed) {
	            throw new IOException("Pipe closed");
	    }
	        
		sink = snk;
		snk.in = -1;
		snk.out = 0;
        snk.connected = true;
    }

    /**
     * 将一个整数写入到与此pw绑定的pr的缓存字符数组buf中去
     */
    public void write(int c)  throws IOException {
        if (sink == null) {
            throw new IOException("Pipe not connected");
        }
        sink.receive(c);
    }

    /**
     * 将cbuf的一部分写入pr的buf中去
     */
    public void write(char cbuf[], int off, int len) throws IOException {
        if (sink == null) {
            throw new IOException("Pipe not connected");
        } else if ((off | len | (off + len) | (cbuf.length - (off + len))) < 0) {
        	throw new IndexOutOfBoundsException();
		}
		sink.receive(cbuf, off, len);
    }

    /**
     * flush此流、唤醒pr中所有等待的方法。
     */
    public synchronized void flush() throws IOException {
		if (sink != null) {
	            if (sink.closedByReader || closed) {
	                throw new IOException("Pipe closed");
	            }            
	            synchronized (sink) {
	                sink.notifyAll();
	            }
		}
    }

    /**
     * 关闭此流。
     */
    public void close()  throws IOException {
        closed = true;
		if (sink != null) {
		    sink.receivedLast();
		}
    }
}

4、实例演示:


因为PipedWriter必须与PipedReader结合使用、所以将两者的示例放在一起。

二:PipedReader


1、类功能简介:


管道字符输入流、用于读取对应绑定的管道字符输出流写入其内置字符缓存数组buffer中的字符、借此来实现线程之间的通信、pr中专门有两个方法供pw调用、receive(char c)、receive(char[] b, int off, intlen)、使得pw可以将字符或者字符数组写入pr的buffer中、

2、PipedReader API简介:


A:关键字

	boolean closedByWriter = false;		标记PipedWriter是否关闭
	
    boolean closedByReader = false;		标记PipedReader是否关闭
    
    boolean connected = false;			标记PipedWriter与标记PipedReader是否关闭的连接是否关闭

    Thread readSide; 	拥有PipedReader的线程
    
    Thread writeSide;	拥有PipedWriter的线程

    private static final int DEFAULT_PIPE_SIZE = 1024;		用于循环存放PipedWriter写入的字符数组的默认大小

    char buffer[];		用于循环存放PipedWriter写入的字符数组

    int in = -1;	buf中下一个存放PipedWriter调用此PipedReader的receive(int c)时、c在buf中存放的位置的下标。此为初始状态、即buf中没有字符

    int out = 0;	buf中下一个被读取的字符的下标


B:构造方法

	PipedReader(PipedWriter src)	使用默认的buf的大小和传入的pw构造pr
	
	PipedReader(PipedWriter src, int pipeSize)		使用指定的buf的大小和传入的pw构造pr
	
	PipedReader()		使用默认大小构造pr
	
	PipedReader(int pipeSize)		使用指定大小构造pr


C:一般方法

	void close()	清空buf中数据、关闭此流。
	
	void connect(PipedWriter src)	调用与此流绑定的pw的connect方法、将此流与对应的pw绑定
	
	synchronized boolean ready()	查看此流是否可读
	
	synchronized int read()		从buf中读取一个字符、以整数形式返回
	
	synchronized int read(char cbuf[], int off, int len)	将buf中读取一部分字符到cbuf中。
	
	synchronized void receive(int c)	pw调用此流的此方法、向pr的buf以整数形式中写入一个字符。
	
	synchronized void receive(char c[], int off, int len)	将c中一部分字符写入到buf中。
	
	synchronized void receivedLast()	提醒所有等待的线程、已经接收到了最后一个字符。


3、源码分析


package com.chy.io.original.code;

import java.io.IOException;

public class PipedReader extends Reader {
    boolean closedByWriter = false;
    boolean closedByReader = false;
    boolean connected = false;

    Thread readSide;
    Thread writeSide;

   /** 
    * 用于循环存放PipedWriter写入的字符数组的默认大小
    */ 
    private static final int DEFAULT_PIPE_SIZE = 1024;

    /**
     * 用于循环存放PipedWriter写入的字符数组
     */
    char buffer[];

    /**
     * buf中下一个存放PipedWriter调用此PipedReader的receive(int c)时、c在buf中存放的位置的下标。
     * in为-1时、说明buf中没有可读取字符、in=out时已经存满了。
     */
    int in = -1;

    /**
     * buf中下一个被读取的字符的下标
     */
    int out = 0;

    /**
     * 使用默认的buf的大小和传入的pw构造pr
     */
    public PipedReader(PipedWriter src) throws IOException {
    	this(src, DEFAULT_PIPE_SIZE);
    }

    /**
     * 使用指定的buf的大小和传入的pw构造pr
     */
    public PipedReader(PipedWriter src, int pipeSize) throws IOException {
		initPipe(pipeSize);
		connect(src);
    }


    /**
     * 使用默认大小构造pr
     */
    public PipedReader() {
    	initPipe(DEFAULT_PIPE_SIZE);
    }

    /**
     * 使用指定大小构造pr
     */
    public PipedReader(int pipeSize) {
    	initPipe(pipeSize);
    }

    //初始化buf大小
    private void initPipe(int pipeSize) {
		if (pipeSize <= 0) {
		    throw new IllegalArgumentException("Pipe size <= 0");
		}
		buffer = new char[pipeSize];
    }

    /**
     * 调用与此流绑定的pw的connect方法、将此流与对应的pw绑定
     */
    public void connect(PipedWriter src) throws IOException {
    	src.connect(this);
    }
    
    /**
     * pw调用此流的此方法、向pr的buf以整数形式中写入一个字符。
     */
    synchronized void receive(int c) throws IOException {
        if (!connected) {
            throw new IOException("Pipe not connected");
        } else if (closedByWriter || closedByReader) {
        	throw new IOException("Pipe closed");
        } else if (readSide != null && !readSide.isAlive()) {
            throw new IOException("Read end dead");
        }

		writeSide = Thread.currentThread();
		while (in == out) {
		    if ((readSide != null) && !readSide.isAlive()) {
		    	throw new IOException("Pipe broken");
		    }
		    //buf中写入的被读取完、唤醒所有此对象监控的线程其他方法、如果一秒钟之后还是满值、则再次唤醒其他方法、直到buf中被读取。
		    notifyAll();	
		    try {
		        wait(1000);
		    } catch (InterruptedException ex) {
		    	throw new java.io.InterruptedIOException();
		    }
		}
		//buf中存放第一个字符时、将字符在buf中存放位置的下标in初始化为0、读取的下标也初始化为0、准备接受写入的第一个字符。
		if (in < 0) {
		    in = 0;
		    out = 0;
		}
		buffer[in++] = (char) c;
		//如果buf中放满了、则再从头开始存放。
		if (in >= buffer.length) {
		    in = 0;
		}
    }

    /**
     * 将c中一部分字符写入到buf中。
     */
    synchronized void receive(char c[], int off, int len)  throws IOException {
		while (--len >= 0) {
		    receive(c[off++]);
		}
    }

    /**
     * 提醒所有等待的线程、已经接收到了最后一个字符、PipedWriter已关闭。用于PipedWriter的close()方法.
     */
    synchronized void receivedLast() {
		closedByWriter = true;
		notifyAll();
    }

    /**
     * 从buf中读取一个字符、以整数形式返回
     */
    public synchronized int read()  throws IOException {
        if (!connected) {
            throw new IOException("Pipe not connected");
        } else if (closedByReader) {
		    throw new IOException("Pipe closed");
		} else if (writeSide != null && !writeSide.isAlive() && !closedByWriter && (in < 0)) {
            throw new IOException("Write end dead");
        }

        readSide = Thread.currentThread();
		int trials = 2;
		while (in < 0) {
		    if (closedByWriter) { 
			/* closed by writer, return EOF */
			return -1;
		    }
		    if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) {
			throw new IOException("Pipe broken");
		    }
	            /* might be a writer waiting */
		    notifyAll();
		    try {
		        wait(1000);
		    } catch (InterruptedException ex) {
			throw new java.io.InterruptedIOException();
		    }
	 	}
		int ret = buffer[out++];
		if (out >= buffer.length) {
		    out = 0;
		}
		if (in == out) {
	            /* now empty */
		    in = -1;		
		}
		return ret;
    }

    /**
     * 将buf中读取一部分字符到cbuf中。
     */
    public synchronized int read(char cbuf[], int off, int len)  throws IOException {
        if (!connected) {
            throw new IOException("Pipe not connected");
        } else if (closedByReader) {
		    throw new IOException("Pipe closed");
		} else if (writeSide != null && !writeSide.isAlive() && !closedByWriter && (in < 0)) {
            throw new IOException("Write end dead");
        }

        if ((off < 0) || (off > cbuf.length) || (len < 0) ||
            ((off + len) > cbuf.length) || ((off + len) < 0)) {
		    throw new IndexOutOfBoundsException();
		} else if (len == 0) {
		    return 0;
		}

        /* possibly wait on the first character */
		int c = read();		
		if (c < 0) {
		    return -1;
		}
		cbuf[off] =  (char)c;
		int rlen = 1;
		while ((in >= 0) && (--len > 0)) {
		    cbuf[off + rlen] = buffer[out++];
		    rlen++;
		    //如果读取的下一个字符下标大于buffer的size、则重置out、从新开始从第一个开始读取。
		    if (out >= buffer.length) {
		    	out = 0;
		    }
		    //如果下一个写入字符的下标与下一个被读取的下标相同、则清空buf
		    if (in == out) {
	                /* now empty */
		    	in = -1;	
		    }
		}
		return rlen;
    }

    /**
     * 查看此流是否可读、看各个线程是否关闭、以及buffer中是否有可供读取的字符。
     */
    public synchronized boolean ready() throws IOException {
        if (!connected) {
            throw new IOException("Pipe not connected");
        } else if (closedByReader) {
	    throw new IOException("Pipe closed");
	} else if (writeSide != null && !writeSide.isAlive()
                   && !closedByWriter && (in < 0)) {
            throw new IOException("Write end dead");
        }
        if (in < 0) {
            return false;
        } else {
            return true;
        }
    }
 
    /**
     * 清空buf中数据、关闭此流。
     */
    public void close()  throws IOException {
		in = -1;
		closedByReader = true;
    }
}


4、实例演示:


用于发送字符的线程:CharSenderThread

package com.chy.io.original.thread;

import java.io.IOException;
import java.io.PipedWriter;

@SuppressWarnings("all")
public class CharSenderThread implements Runnable {
	private PipedWriter pw = new PipedWriter();
	
	public PipedWriter getPipedWriter(){
		return pw;
	}
	@Override
	public void run() {
		//sendOneChar();
		//sendShortMessage();
		sendLongMessage();
	}

	private void sendOneChar(){
		try {
			pw.write("a".charAt(0));
			pw.flush();
			pw.close();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
	
	private void sendShortMessage() {
		try {
			pw.write("this is a short message from CharSenderThread !".toCharArray());
			pw.flush();
			pw.close();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
	
	private void sendLongMessage(){
		try {
			char[] b = new char[1028];
			//生成一个长度为1028的字符数组、前1020个是1、后8个是2。
			for(int i=0; i<1020; i++){
				b[i] = 'a';
			}
			for (int i = 1020; i <1028; i++) {
				b[i] = 'b';
			}
			pw.write(b);
			pw.flush();
			pw.close();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
}

用于接收字符的线程: CharReceiveThread

package com.chy.io.original.thread;

import java.io.IOException;
import java.io.PipedReader;

@SuppressWarnings("all")
public class CharReceiverThread extends Thread {
	
	private PipedReader pr = new PipedReader();
	
	public PipedReader getPipedReader(){
		return pr;
	}
	@Override
	public void run() {
		//receiveOneChar();
		//receiveShortMessage();
		receiverLongMessage();
	}
	
	private void receiveOneChar(){
		try {
			int n = pr.read();
			System.out.println(n);
			pr.close();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
	
	private void receiveShortMessage() {
		try {
			char[] b = new char[1024];
			int n = pr.read(b);
			System.out.println(new String(b, 0, n));
			pr.close();
			
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
	
	private void receiverLongMessage(){
		try {
			char[] b = new char[2048];
			int count = 0;
			while(true){
				count = pr.read(b); 
				for (int i = 0; i < count; i++) {
					System.out.print(b[i]);
				}
				if(count == -1)
					break;
			}
			pr.close();
			
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
	
}

启动类:PipedWriterAndPipedReaderTest

package com.chy.io.original.test;

import java.io.IOException;
import java.io.PipedReader;
import java.io.PipedWriter;

import com.chy.io.original.thread.CharReceiverThread;
import com.chy.io.original.thread.CharSenderThread;

public class PipedWriterAndPipedReaderTest {
	public static void main(String[] args) throws IOException{
		CharSenderThread cst = new CharSenderThread();
		CharReceiverThread crt = new CharReceiverThread();
		PipedWriter pw = cst.getPipedWriter();
		PipedReader pr = crt.getPipedReader();
		
		pw.connect(pr);
		
		/**
		 * 想想为什么下面这样写会报Piped not connect异常 ?
		 */
		//new Thread(new CharSenderThread()).start();
		//new CharReceiverThread().start();
		
		new Thread(cst).start();
		crt.start();
	}
}

两个线程中分别有三个方法、可以对应的每次放开一对方法来测试、还有这里最后一个读取1028个字符的方法用了死循环来读取、可以试试当不用死循环来读取会有什么不一样的效果?初始化字符的时候要用char = 'a' 而不是cahr = "a"、可自己想原因。。。

总结:


PipedReader、PipedWriter两者的结合如鸳鸯一般、离开哪一方都不能继续存在、同时又如连理枝一般、PipedWriter先通过connect(PipedReader sink)来确定关系、并初始化PipedReader状态、告诉PipedReader只能属于这个PipedWriter、connect =true、当想赠与PipedReader字符时、就直接调用receive(char c) 、receive(char[] b, int off, int len)来将字符或者字符数组放入pr的存折buffer中。站在PipedReader角度上、看上哪个PipedWriter时就暗示pw、将主动权交给pw、调用pw的connect将自己给他去登记。当想要花(将字符读取到程序中)字符了就从buffer中拿、但是自己又没有本事挣字符、所以当buffer中没有字符时、自己就等着、并且跟pw讲没有字符了、pw就会向存折(buffer)中存字符、当然、pw不会一直不断往里存、当存折是空的时候也不会主动存、怕花冒、就等着pr要、要才存。过到最后两个只通过buffer来知道对方的存在与否、每次从buffer中存或者取字符时都会看看对方是否安康、若安好则继续生活、若一方不在、则另一方也不愿独存!


更多IO内容:java_io 体系之目录


版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/iteye_563/article/details/82552845

智能推荐

十进制小数部分如何转化成二进制算法实现_小数进制转换编程-程序员宅基地

文章浏览阅读483次。十进制小数转化成二进制,就是不断地乘二,判断之后的这个数是否比1大,比1大则输出输出1,留下小数部分继续前面的操作。将3.75的小数部分转化为二进制。最后的输出结果是.11。_小数进制转换编程

免费OFD文件在线转PDF_在线生成一个 ofd文件-程序员宅基地

文章浏览阅读739次。ofd文件打不开?ofd怎么转换为pdf?本文将给告诉大家ofd是什么文件格式?ofd怎么打开?ofd怎么免费转换为配pdf文件等,以下是具体的方法:一,什么是OFD文件?OFD是我国电子公文交换和存储格式标准。OFD格式是我国自主可控的电子文件版式文档格式。OFD版式文件,版面固定、不跑版、所见即所得,可以视为计算机时代的“数字纸张”;是电子文档发布、数字化信息传播和存档的理想文档格式。OFD格式是当下对于全国产环境具有明显的优势。因此,在自主可控档案系统中,OFD格式无疑是自主可控档案系.._在线生成一个 ofd文件

14、HDFS 透明加密KMS_mapreduce读写sequencefile、mapfile、orcfile和parquetfil-程序员宅基地

文章浏览阅读3w次。HDFS中的数据会以block的形式保存在各台数据节点的本地磁盘中,但这些block都是明文的。通过Web UI页面找到Block的ID和副本位于的机器信息如果在操作系统中直接访问block所在的目录,通过Linux的cat命令是可以直接查看里面的内容的,且是明文。在datanode找到其文件为:HDFS透明加密(Transparent Encryption)支持端到端的透明加密,启用以后,对于一些需要加密的HDFS目录里的文件可以实现透明的加密和解密,而不需要修改用户的业务代码。_mapreduce读写sequencefile、mapfile、orcfile和parquetfile文件

Java SE 第三章 常用类 API_java se api常用类-程序员宅基地

文章浏览阅读92次。3.0 API概述https://www.oracle.com/cn/java/technologies/java-se-api-doc.html如何使用API看类的描述​ Random类是用于生成随机数的类看构造方法​ Random():无参构造方法 Random r = new Random();看成员方法​ public int nextInt(int n):产生的是一个[0,n)范围内的随机数调用方法: int number = r.nextInt(10_java se api常用类

3-16心电图多分类预测task01_机器学习在呼吸心跳信号检测中应用ti-程序员宅基地

文章浏览阅读285次。3-16心电图多分类预测task01一、赛题理解1.赛题理解1.数据概况1.评价指标二、baseline学习1.引入库2.读入数据3.数据预处理4.训练、测试数据准备5.模型训练总结提示:以下是本篇文章正文内容,下面案例可供参考一、赛题理解1.赛题理解以心电图心跳信号数据为背景,要求根据心电图感应数据预测心跳信号所属类别,其中心跳信号对应正常病例以及受不同心律不齐和心肌梗塞影响的病例,这是一个多分类的问题。1.数据概况以预测心电图心跳信号类别为任务,总数据量超过20万,主要为1列心跳信号序列数_机器学习在呼吸心跳信号检测中应用ti

【超好懂的比赛题解】第 45 届国际大学生程序设计竞赛(ICPC)亚洲区域赛(济南)_icpc国际大学生程序设计竞赛题目-程序员宅基地

文章浏览阅读930次,点赞2次,收藏2次。title : 第 45 届国际大学生程序设计竞赛(ICPC)亚洲区域赛(济南)tags : ACM,题解,练习记录。_icpc国际大学生程序设计竞赛题目

随便推点

工具系列:TensorFlow决策森林_(3)使用dtreeviz可视化-程序员宅基地

文章浏览阅读1.2k次,点赞19次,收藏19次。之前的教程演示了如何使用TensorFlow的决策森林(随机森林、梯度提升树和CART)分类器和回归器来准备数据、训练和评估。(我们将TensorFlow决策森林缩写为TF-DF。)您还学会了如何使用内置的函数可视化树,并显示特征重要性度量。本教程的目标是通过可视化更深入地解释分类器和回归器决策树。我们将查看详细的树结构图示,以及决策树如何划分特征空间以做出决策的描绘。树结构图帮助我们理解模型的行为,特征空间图帮助我们通过展示特征和目标变量之间的关系来理解数据。我们将使用的可视化库称为dtreeviz。_dtreeviz

MySQL8.0学习记录10 - 字符集与校对规则_mysql8.0存储系统元数据的字符集是-程序员宅基地

文章浏览阅读2.1k次。MySQL8.0字符集_mysql8.0存储系统元数据的字符集是

漫威所有电影的 按时间线的观影顺序-程序员宅基地

文章浏览阅读3.1k次。美国队长1 - 2011年惊奇队长 - 2019年钢铁侠1 - 2008年无敌浩克 - 2008年钢铁侠2 - 2010年雷神 - 2011年复仇者联盟 - 2012年雷神2 - 2013年钢铁侠3 - 2013年美国队长2 - 2014年复仇者联盟2 - 2015年银河护卫队 - 2017年蚁人 - 2015年美国队长3 - 2016年奇异博士 - 2016年银河护卫队2 - 2017..._漫威电影观看顺序时间线

PhotoZoom Classic 7中的新功能-程序员宅基地

文章浏览阅读142次。众所周知PhotoZoom Classic是家庭使用理想的放大图像软件。目前很多用户还在使用PhotoZoom Classic 6,对于PhotoZoom Classic 7还是有点陌生。其实在6代衍生下出了7代,7代比6代多了很多适用的功能。下面我们就介绍一下PhotoZoom Classic 7中的新功能。PhotoZoom Classic 6的功能我们就不过多介绍,主要介绍7代中特有的功..._photozoon的作用

tensorflow中tf.keras.models.Sequential()用法-程序员宅基地

文章浏览阅读4.6w次,点赞75次,收藏349次。tensorflow中tf.keras.models.Sequential()用法Sequential()方法是一个容器,描述了神经网络的网络结构,在Sequential()的输入参数中描述从输入层到输出层的网络结构model = tf.keras.models.Sequential([网络结构]) #描述各层网络网络结构举例:拉直层:tf.keras.layers.Flatten() #拉直层可以变换张量的尺寸,把输入特征拉直为一维数组,是不含计算参数的层全连接层:tf.ker._tf.keras.models.sequential

Java递归实现Fibonacci数列计算_用递归方法编程计算fibonacci数列:(n=10),fac.jpg-程序员宅基地

文章浏览阅读2.8k次。实现代码如下:public static int factorial(int n){ if (n <= 1){ return 1; } return factorial(n-1) + factorial(n-2); }测试代码如下:System.out.println(factorial(40));测..._用递归方法编程计算fibonacci数列:(n=10),fac.jpg

推荐文章

热门文章

相关标签