模式名称

Pipeline(流水线)模式

原文:http://www.uml.org.cn/j2ee/201909271.asp

模式解决的问题

有时一些线程的步奏比较冗长,而且由于每个阶段的结果与下阶段的执行有关系,又不能分开。

解决思路

可以将任务的处理分解为若干个处理阶段,上一个阶段任务的结果交给下一个阶段来处理,这样每个线程的处理是并行的,可以充分利用资源提高计算效率。

模式所使用的类:

Pipe
对处理阶段的抽象,负责对输入进行处理,并将输出作为下一个阶段的输入:process()用于接收前一个处理阶段的处理结果,作为该处理阶段的输入,init()初始化当前处理阶段对外提供的服务,shutdown()关闭当前处理阶段对外提供的服务,setNextPipe()设置当前处理阶段的下一个处理阶段。

PipeContext
对各个处理阶段的计算环境进行抽象,主要用于异常处理:handleError()用于对处理阶段泡醋的异常进行处理。

AbstractPipe
Pipe接口的抽象实现类:process()接收抢一个处理阶段的输入并调用其子类实现的doProcess方法对输入元素进行处理,init()保存对其参数中制定的PipeContext实例的引用,子类可根据需要覆盖该方法以实现其服务的初始化。

Shutdown 默认实现什么也不做,子类可根据需要覆盖该方法实现服务停止:setNextPipe()设置当前处理阶段的下一个处理阶段,doProcess()留给子类实现的抽象方法,用于实现其服务的初始化。

WorkerThreadPipeDecprator
基于工作线程的Pipe实现类,该Pipe实例会将接收到的输入元素存入队列,由制定个数的工作者线程对队列中输入元素进行处理,该类的自身主要负责工作者线程的生命周期的管理:process()接收前一个处理阶段的输入,并将其存入队列由工作者线程运行时取出进行处理,init()启动工作者线程并调用委托Pipe实例的init方法,shutdown()停止工作者线程并委托Pipe实例的shutdown方法,setNextPipe()调用委托Pipe实例的setNextPipe方法,dispatch()取队列中的输入元素并调用委托Pipe实例的process方法对其进行处理

ThreadPoolPopeDecorator
基于线程池的Pipe的实现类:process()接收前一个处理阶段的输入,并向线程池提交一个对该输入进行相应处理的任务,init()调用委托pipe实例的init方法,shutdown()关闭当前Pipe实例对外提供的服务并调用委托Pipe实例的shutdown方法,setNextPipe()调用委托Pipe实例的setNextPipe方法。

AbstractParallelPile类
AbstractPipe的子类,支持并行处理的Pipe实现类,该类对其每个输入元素(原始任务)生成相应的一组子任务,并以并行的方式去执行这些子任务,各个子任务的执行结果会被合并为相应原始任务的输出结果:bulidTasks()流给子类实现的抽象方法,用于根据制定的输入构造一组子任务,combineResults()留给子类实现的抽象方法,对各个并行子任务的处理结果进行合并,形成相应输入元素的输出结果。invokeParallel()实现以并行的方式执行一组任务,doProcess()实现该类对其输入的处理逻辑。

ConcreteParallelPipe类
由应用定义的AbstractParallelPipe的子类:buildTasks()根据指定的输入构造一组子任务,combineResults()对各个并行子任务的处理结果进行合并,形成相应输入元素的输出结果

Pipeline类
对符合Pipe的抽象:addPipe()往该Pipeline实例中添加一个Pipe实例。

SimplePipeline类
基于AbstractPipe的Pipeline接口的一个简单实现类:addPipe()往该Pipeline实例中添加一个Pipe实例,addAsWorkerThreadBasedPipe()将制定的Pipe实例用WorkerThreadPipeDecorator实例包装后加入Pipeline实例,addAsThreadPoolBasedPipe()将制定的Pipe实例用ThreadPoolPipeDecorator实例包装后加入Pipeline实例。

Pipeline模式的服务初始化序列图

示例代码

某系统需要一个数据同步的定时任务,该定时任务将数据库中符合制定条件的记录数据以文件的形式FTP传输(同步)到制定的主机上。该定时任务需要满足以下要求:

  1. 每个数据文件最多只包含N(如10000,具体可配置)条记录;当一个数据文件被写满时,其他代谢记录会被写入新的数据文件。

  2. 每个数据文件可能需要被传输到多台主机上。

  3. 本地要保留同步过的数据文件的备份。

因此,该定时任务需要做三件事情,都是比较耗时的操作,而且后面的操作还需要依赖前面操作的结果才能进行,不易拆分,如果只是用多线程,每个线程中仍然是按顺序串行处理也是不合适的,这样的话第二个步奏会出现多线程之间争夺资源导致时间浪费的问题,会更难完成任务,所以需要有Pipeline模式去执行。

数据同步定时任务

public class DataSynctask implements Runnable {public void run() {ResultSet rs = null;SimplePipeline<RecordSaveTask, String> pipeline = buildPipeline();pipeline.init(pipeline.newDefaultPipeContext());Connection dbConn = null;try {dbConn = getConnection();rs = qryRecords(dbConn);processRecords(rs.pipeline);} catch (Exception e) {e.printStackTrace();} finally {if (null != dbConn) {try {dbConn.close();} catch (SQLException e) {;}}pipeline.shutdown(360, TimeUnit.SECONDS);}private ResultSet qryRecords (Connection dbConn)
throws Exception {dbConn.setReadOnly(true);PreparedStatement ps = dbConn.prepareStatement("select id,productId,packageId,msisdn,operationTime,operationTyoe, " +"effectiveDate,dueDate from subscriptions order by operationTime",ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);ResultSet rs = ps.executeQuery();return rs;}private static Connection getConnection () throwsException {Connection dbConn = null;Class.forName("org.hsqldb.jdbc.JDBCDriver");dbConn = DriverManager.getConnectio("jdbc:hsqldb:hsql://192.168.1.105:9001", "SA", "");return dbConn;}private static Record makeRecordFrom (ResultSet rs)
throws SQLException {Record record = new Record();record.setId(rs.getInt("id"));record.setProductId(rs.getString("productId"));record.setPackageId(rs.getString("msisdn"));record.setOperationTime(rs.getTimestamp("operationTime"));record.setOperationType(rs.getInt("operationType"));record.setEffectiveDate(rs.getTimestamp("effectiveDate"));record.setDueDate(rs.getTimestamp("dueDate"));}private static class RecordSaveTask {public final Record[] records;public final int targetFileIndex;public final String recordDay;public RecordSaveTask(Record[] records, int targetFileIndex) {this.records = records;this.targetFileIndex = targetFileIndex;this.recordDay = null;}puclic RecordSaveTask(String recordDay, inttargeFileIndex) {this.records = null;this.targetFileIndex = targetFileIndex;this.recordDay = recordDay;}}private SimplePipeline<RecordSavetask, String> buildPipeline () {/*
* 线程的本质是重复利用一定数量的线程,而不是针对
每个任务
都有一个专门的工作者线程。
* 这里,各个Pipe的初始化完全可以在上游Pipe初始化
完毕后再
初始化其后继Pipe,二不必多
* 个Pipe同时初始化。
* 因此,这个初始化的动作可以由一个线程来处理。该线程处理
完各个Pipe的初始化后,可以继续
* 处理之后可能产生的任务,如出错处理。
* 所以,上述这些先后产生的任务可以由线程池中的
一个工作者
线程从头到尾执行。
*/final ExecutorService helperExecutor = Executors.newSingleThreadExecutor();final SimplePipeline<RecordSaveTask, String> pipeline = newSimplePipeline<RecordSaveFile, String>(helperExcecutor);Pipe<RecordSaveTask, File> stageSaveFile = new AbstractPipe<RecordSaveTask, File>() {final RecordWriter recordWriter = RecordWriter.getInstance();final Record[] records = task.records;File file;
if(null==records){file = recordWriter.write(records, task.targerFileIndex);}else{try {file = recordWriter.write(records.task.targetFileIndex);} catch (IOException e) {throw new PipeException(this, task, "Failed to save records", e);}}};
/*
* 由于这里的几个Pipe都是处理I/O的,为了避免使用锁
(以减少不必要的上下文切换)但又能
* 保证线程安全,故每个Pipe都采用单线程处理。
* 若各个Pipe要改用线程池来处理,需要注意:1
线程安全2)死锁。
*/pipeline.addAsWorkerThreadBasedPipe(stageSaveFile, 1);final String[][] ftpServerConfigs = retrieveFTPServConf();final ThreadPoolExecutor ftpExecutorService = new ThreadPoolExecutor(1ftpServerConfigs.length, 60, TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(100), new RejectedExecutionHandler() {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {if (!executor.isShutdown()) {try {executor.getQueue().put(r);} catch (InterruptedException e) {;}}}});Pipe<File, File, File> stageTransferFile = new AbstractParallelPipe<File, File,File>((new SynchronousQueue<file>(), ftpExecutorService)){Future[ftpServerConfigs.length];@Overridepublic void init (PipeContext piptCtx){super.init(pipeCtx);String[] ftpServerConfig;for (int i = 0; i < ftoServerConfigs.length; i++) {ftpServerConfig = ftpServerConfigs[i];ftpClientUtilHolders[i] = FTPClientUtil.newInstance(ftpServerConfig[0], ftpServerConfig[1], ftpServerConfig[2]);}}@Overrideprotected List<Callable<File>> buildTasks ( final File file){List<Callable<File>> tasks = new LinkedList<Callable<File>>();for (Fucture<FTPClientUtil> ftpClientUtilHolder :ftpClientUtilHolders) {tasks.add(new ParallelTask(fipClientUtilHolder, file));}return tasks;}@Overrideprotected File combineResults (List < Future < File >> subTaskResults)
throws Exception {if (0 == subTaskResults.size()) {return null;}File file = null;file = subTaskResults.get(0).get();return file;}@Overridepublic void shutdown ( long timeout, TimeUnit unit){super.shutdown(timeout / unit);ftpExecutorService.shutdown();try {ftpExecutorService.awaitTermination(timeout, unit);} catch (InterruptedException e) {;}for (Future<FTPClientUtil> ftpClientUtilHolder :ftpClientUtilHolders) {try {ftpClientUtilHolder.get().disconnect();} catch (Exception e) {;}}}class ParallelTask implements Callable<File> {public final Future<FTPClientUtil> ftpUtilHodler;public final File file2Transfer;public ParallelTask(Future<FTPClientUtil>ftpUtilHodler,File file2Transfer) {this.ftpUtilHodler = ftpUtilHodler;this.file2Transfer = file2Transfer;}@Overridepublic File call() throws Exception {File transferedFile = null;ftpUtilHodler.get().upload(file2Transfer);transferedFile = file2Transfer;return transferedFile;}}} ;pipeline.addAsWorkerThreadBasedPipe(stageTransferFile, 1);
//备份已经传输的数据文件Pipe<File, Void> stageBackupFile = new AbstractPipe<File, Void>() {@Overrideprotected Void doProcess(File transferedFile) throws PipeException {RecordWriter.backupFile(transferedFile);return null;}public void shutdown(long timeout, TimeUnit unit) {//所有文件备份完毕后,七里掉空文件夹RecordWriter.purgeDir();}};pipeline.addAsWorkerThreadBasedPipe(stageTransferFile,1);return pipeline;}private String[] retrieveFTPServConf () {String[][] ftpServerConfigs = new String[][]{{"192.168.1.105", "datacenter", "abc123"},{"192.168.1.105", "datacenter", "abc123"},{"192.168.1.105", "datacenter", "abc123"}};return ftpServerConfits;}private void processRecords (ResiltSet rs,Pipeline < RecordSaveTask, String > pipeline) throws Exception {Recprd record;Recprd[] records = new Record[Config.RECORD_SAVE_CHNK_SIZE];int targeFileIndex = 0;int nextTargetFileIndex = 0;int recordCountInTheDay = 0;int recordCountInTheFile = 0;String recordDay = null;String lastRecordDay = null;SimpleDateFormat sdf = new SimpleDateFormat("yyMMdd");while (rs.next()) {record = makeRecordFrom(rs);lastRecordDay = recordDay;recordDay = sdf.format(record.getOperationTime());if (recordDay.equals(lastRecordDay)) {records[recordCountInTheFile] = record;recordCountInTheDay++;} else {//实际以发生的不同日期记录文件切换if (null != lastRecprdDay) {pipeline.process(new RecordSaveTask(Arrays.copyOf(records,recordCountInTheFile).targetFileIndex));} else {pipeline.process(new RecordSaveTask(laskRecordDay, targeFileIndex));}
//在此之前,先将records中的内容写入文件records[0] = record;recordCountInTheFile = 0;}recordCountInTheDay = 1;}if (nextTargetFileIndex == targetFileIndex) {recordCountInTheFile++;if (0 == (recordCountInTheFile % Config.RECORD_SAVE_CHUNK_SIZE)) {pipeline.process(new RecordSaveTask(Arrays.copyOf(recirds,recordCountInTheFile), targetFileIndex));recordCountInTheFile = 0;}}nextTargetFileIndex = (recordCountInTheDay) /Config.MAX_RECORDS_PER_FILE;if (nextTargetFileIndex > targetFileIndex) {//预测到将发生同日期记录文件切换if (recordCountInTheFile > 1) {pipeline.process(new RecordSaveTask(Arrays.copyOf(records,recordCountInTheFile), targetFileIndex));} else {pipeline.process(new RecordSaveTask(recordDay, targetFileIndex));}recordCountInTheFile = 0;targetFileIndex = nextTargetFileIndex;} else if (nextTargetFileIndex < targetFileIndex) {//实际已发生的异日期记录文件切换,recordCountInTheFile保持当前值targetFileIndex = nextTargetFileIndex;}}if (recordCountInTheFile > 0) {pipeline.process(new RecordSaveTask(Arrays.copyOf(records,recordCountInTheFile), targetFileIndex));}}}

FTPClientUtil类

public class FTPClientUtil {private final FtpClient ftp = new FTPClient();private final Map<String, Boolean> dirCreateMap = new HashMap<String, Boolean>();/** helperExecutor是个静态变量,这使得newInstance方法在生成不同的FTPClientUtil实例是* 可以公用同一个线程池;* 模式角色:Promise.TaskExecutor*/private volatile static ExecurorService helperExecutor;static {helperExcurtor = new ThreadPoolExecutor(1,Runtime.getRuntime().availableProcessors() * 2,60, TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(10), new ThreadFactory() {public Thread newThread(Runable r) {Thread t = new Thread(r);t.setDaemon(true);return t;}}, new ThreadPoolExecutor.CallerRunsPolicy());}//私有构造器private FTPClientUtil() {}//模式角色:Promise.Promisor.conmputepublic static Future<FTOClientUtil> newInstance(final String ftpServer, finalString userName, final String password) {Callable<FTPClientUtil> callable = new Callable<FTPClientUtil>() {@Overridepublic FTPClientUtil call() throws Exception {FTPClientUtil selt = new FTPClientUtil();selt.init(ftpServer, userName, password);return self;}};
//task相当于模式角色:Promise.Primisefinal FutureTask<FTOClientUtil> task = newFutureTask<FTOClientUtil>(callable);helperExecuror.execute(task);return task;}private void init(String ftpServer, String userName, String password) throwsException {FTPClientConfig config = new FTPClientConfig();ftp.configure(config);int reply;ftp.connect(ftpServer);System.out.print(ftp.getReplyString());reply = ftp.getReplyCode();if (!FTPReply.isPositiveCompletion(reply)) {ftp.disconnect();throw new RuntimeException("FTp server refufused connection.");}boolean isOK = ftp.login(uesrName, password);if (isOK) {System.out.println(fto, getReplyString());} else {throw new RuntimeException("Failed to login." + ftp.getReplyString());}reply = ftp.cwd("`/subspeync");if (!FTPReply.isPositiveCompletion(reply)) {ftp.disconnect();throw new RuntimeException("Failed to change working" +"directory.reply." + reply);} else {System.out.println(ftp.getReplyString());}ftp.setFileType(FTP.ASEII_FILE_TYPE);}public void upload(File file) throws Exception {InputStream dataIn = new BufferedInputStream(newFileInputStream(file)1024 * 8);boolean isOS;String dirName = file.getParent();String fileName = dirName + '/' + file.getName();ByteArrayInputStream checkFileInputSream = newByteArrayInputStream("".getBytes());try {if (!dirCreateMap.containskey(dirName)) {ftp.makeDirectory(dirName);dirCreateMap.put(dirName, null);try {isOK = ftp.storeFile(fileName, dataIn);} catch (IOException e) {throw new RuntimeException("Failed to upload " + file, e);}if (isOK) {ftp.storeFile(fileName + ".c", checkFileInputStream);} else {throw new RuntimeException("Failed to upload " + file + ",reply:" +"," + ftp.getReplyString());}}finally{dataIn.close();}}public void disconnect () {if (fto,isConnected){try {ftp.disconnect();} catch (IOException ioe) {//什么也不做}}}}}

RecordWriter类

 public class RecirdWriter {private static final RecoedWriter INSTANCE = new RecordWriter();//HashMap不是安全的,但RecordWriter实例是在单线程中使用的,因此不会产生问题private static Map<Sting, PrintWriter> printWriterMap = new HashMap<String, PrintWriter>();private static String baseDir;private static final char FIELD_SEPARATOR = '|';//SimpleDateFormat不是线程安全的,但RecordWriter实例是在单线程中使用的,因此不会产生问题。private static final SimpleDateFormat FILE_INDEX_FORMATTER = new DecimalFormat{ "0000" };private static final int RECORD_JOIN_SIZE = Config.RECORD_JOIN_SIZE;private static final FieldPosition FIELD_POS = new FieldPssition(DateFormat.Field.DAY_OF_MONTH);//私有构造器
private RecordWriter() {baseDir = System.getProperty("user.home") + "/tmp/ subspsync / ";}public static RecordWriter getInstance() {return INSTANCE;}        public File write(Record[] targetFileIndex) throws IOException {if (null == records || 0 == records.length) {throw new IllegalArgumentException("records is null or empty ");}int recordCount = records.length;String recordDay;recordDay = DIRECTORY_NAME_FORMATTER.format(records[0].getOperat ionTime());String fileKey = recordDay + '-' + targeFileIndex;PrintWriter pwr = printWriterMap.get(fileKey);if (null == pwr) {File file = new Fiole(baseDir + '/' + recordDay + "/subspeync-gw-"+ FILE_INDEX_FORMATTER.format(targetFileIndex) + ".dat");File dir = file.getParentFile();if (!dir.exists() && !dir.mkdirr()) {throw new IOException("No such directory:" + dir);}pwr = new PrintWriter(new BufferedWritern(new FileWriter(file, true), Cofig.WRITER_BUFFER_SIZE));printWriterMap.put(fileKey, pwr);}StringBuffer strBuf = new StringBuffer(40);int i = 0;for (Record record : records) {i++;pwr.print(String.valueOf(record.getId()));pwr.print(FIELD_SEPARATOR);pwr.print(record.getMsisdn());pwr.print(FIELD_SEPARATOR);pwr.print(record.getProductId());pwr.print(FIELD_SEPARATOR);pwr.print(record.getRackageId());pwr.print(FUEKD_SEPARATOR);pwr.print(String.valueOf(record.getOperationType()));pwr.print(FIELD_SEPARATOR);strBuf.delete(0, 40);pwr.print(sdf.format(recore.getOperationtime(),strBuff, FIELD_POS));pwr.print(FIELD_SEPARATOR);strBuf.delete(0, 40);pwr.print(sdf.format(recore.getOperationtime(),strBuff, FIELD_POS));strBuf.delete(0, 40);pwr.print(FIELD_SEPARATOR);pwr.print(sdf.format(record.getDueDate(),strBuf, FIELD_POS));pwr.print('\n');if (0 == (i % RECORD_JOIN_SIZE)) {pwr.flush();i = 0;//Thread.yield();}}if (i > 0) {pwr.flush();}File file = null;//处理当前文件中的最后一组记录if (recordCount < Config.RECORD_SAVE_CHUNK_SIZE) {pwr.close();file = new FIle(baseDir + '/' + recordDay + "/ subspsync - gw - "+ FILE_INDEX_FORMATER.format(targetFileIndex)+ ".dat");printWriterMap.remove(fileKey);}return file;}public File finishRecords(String recordDay, int targetFileIndex) {String fileKey = recordDay + '-' + targetFileIndex;PrintWriter pwr = printWriterMap.get(fileKey);File file = null;if (null != pwr) {pwr.flush();pwr.close();file = new File(baseDir + '/' + recordDay + "/ subspsync - gw - " + FILE_INDEX_FORMATER.format(targetFileIndex) + ".dat");printWriterMap.remove(fileKey);}retutn file;}public static void backupFile(final File file) {String recordDay = file.getParentFile().getName();File destFile = new File(baseDir + "/backup" +recordDay);if (!destFile.exists()) {throw new RuntimeException("Failed to backup file " + file);}file.delete();}public static void purgeDir() {File[] dirs = new File(baseDir).listFiles();for (File dir : dirs) {if (dir.isDirectory() && 0 == dir.list().length) {dir.delete();}}}}

模式考量

Pipeline模式可以对有依赖关系的任务实现并行处理。并行和并发编程中,为了提高并发性我们旺旺需要将规模较大的任务分解撤柜若干个规模较小的子任务,这些子任务间同城没有依赖关系。而Pipeline模式则允许子任务间存在依赖关系的条件下实现并行运算。

Pipeline模式为用单线程模式编程提供了便利。多线程编程总的来说是复杂的,不仅代码编写比较复杂,出现问题也不好定位,多线程出现非预期结果是,开发人员不仅要考虑算法是否正确,还要考虑是否是多线程先关问题导致非预期的结果。相反,单线程编程就显得相对简单。Pipeline模式非常便于我们采用单线程模式实现对子任务的处理。

Pipeline模式中,每个Pipeline实例都是一个Pipe实例,因此,我们可以添加成其他实例,这就加大了该模式的扩展性和灵活性。

当然Pipeline模式也有一定的风险:Pipeline模式中各个处理阶段所用的工作者线程或者线程池,表示各个阶段的输入/输出对象的创建和一定(进出队列)都有其自身的时间和空间开销,所以使用Pipeline模式的时候需要考虑它所付出的代价。建议处理规模较大的任务,否则可能得不偿失。

模式需要注意的东西

1.Pipeline的深度:Pipeline中Pipe的个数被称作Pipeline的深度。所以我们在用Pipeline的深度与JVM宿主机的CPU个数间的关系。如果Pipeline实例所处的任务多属于CPU密集行,那么深度最好不超过Ncpu。如果Pipeline所处理的任务多属于I/O密集型,那么Pipeline的深度最好不要超过2*Ncpu。

2.基于线程池的Pipe:如果Pipe实例使用线程池,由于有多个Pipe实例,更容易出现线程死锁的问题,需要仔细考虑。

3.错误处理:Pipe实例对其任务进行过程中跑出的异常可能需要相应Pipe实例之外进行处理。此时,处理方法通常有两种:一是各个Pipe实例捕获到异常后调用PipeContext实例的handleError进行错误处理。另一个是创建一个专门负责错我处理的Pipe实例,其他Pipe实例捕获异常后提交相关数据给该Pipe实例处理。

4.可配置的Pipeline:Pipeline模式可以用代码的方式将若干个Pipe实例添加,可以用配置文件的方式实现动态方式添加Pipe。

Pipeline(流水线)模式相关推荐

  1. 对《生产流水线模式》讨论的总结性回复

    我的上一篇文章<生产流水线模式>发布以后,引起了很多朋友的关注,大家发表了很多意见,现在我针对留言中大家提得比较多的问题,做一个总结性的回复. 问题一:我的敏捷开发架构是不是只实现了简单的 ...

  2. pipeline流水线及分布式流水线发布php项目

    创建一个基于pipeline流水线的项目 第一个选项是流水线脚本(不是shell脚本),右上角有一个脚本的范例 范例: 1:尝试自己写一个脚本 2:hello world脚本(里面是函数,输出的内容, ...

  3. Pipeline流水线及分布式流水线发布PHP项目及JAVA项目

    Jenkins的Pipeline流水线 主机名 IP地址 备注 Git 192.168.146.136 Git服务器 Jenkins 192.168.146.137 Jenkins服务器 Pipeli ...

  4. Jenkins骚操作第四章构建maven项目和Pipeline流水线项目构建

    文章目录 Jenkins构建Maven项目 1.Jenkins项目构建类型-自由风格项目构建 1.1.拉取代码 1.2.编译打包 1.3.部署 2.Jenkins项目构建类型(3)--Maven项目构 ...

  5. Pipeline流水线项目构建

    目录 Pipeline简介 概念 安装Pipeline插件 Scripted脚本式Pipeline Declarative声明式Pipeline 使用代码生成器生成流水线脚本 管理Jenkinsfil ...

  6. DevOps流水线(1)什么是Pipeline流水线?

    从头开始构建 DevOps 流水线.推动该计划的核心技术是 Jenkins,这是一个用于建立持续集成和持续交付(CI/CD)流水线的开源工具. 在花旗,有一个单独的团队为专用的 Jenkins 流水线 ...

  7. Pipeline流水线-通过Jenkinsfile构建任务

    写在前面 作为CI/CD工具的宠儿,jenkins深受java程序员.k8s领域的喜爱.jenkins有广泛的插件,可以支撑多种应用场景.虽然jenkins的权限管理让人感到可惜,但是基于庞大的用户群 ...

  8. Go编程模式--流水线模式

    流水线工作模型在工业领域内十分常见,它将工作流程分为多个环节,每个环节根据工作强度安排合适的人员数量.良好的流水线设计尽量让各环节的流通率平衡,最大化提高产能效率. Go 是一门实用性语言,流水线工作 ...

  9. Redis 学习 - 05 Node.js 客户端操作 Redis、Pipeline 流水线

    使用编程语言客户端操作 Redis 目前我们进行的操作都是通过 Redis 的命令行客户端 redis-cli 进行的. 开发者也可以通过 Redis 图形管理软件操作,例如 RDM(Redis De ...

最新文章

  1. Linux08-日志
  2. Vue学习笔记(八) 组件进阶
  3. DrawArc绘制弧线
  4. 奥维地图导入西安坐标_带了坐标的照片,要上天了
  5. Docker版本Omnibus Gitlab 加Lets Encrypt免费SSL一键搭建
  6. Unity脚本的生命周期
  7. ai人工智能让女神_让女孩进入人工智能管道
  8. Fréchet Inception Distance(FID)
  9. US Shirt Size
  10. 一款非常实用的视频剪辑软件,它可以满足您进行视频制作的需要
  11. 查询出部门名称、部门的员工数、部门的平均工资、部门的最低收入雇员姓名和最高收入雇员的姓名...
  12. cloud-init 数据获取处理文档
  13. linux socket函数详解,linuxSocket_函数.doc
  14. springboot以FTP方式上传文件到远程服务器
  15. B. Partial Replacement
  16. [Python机器学习]Nagel-Schreckenberg(交通流)模型
  17. html转换为pdf php,js实现html转成pdf
  18. Step Hero NFT奇幻主题游戏
  19. 卡特尔16中人格分析,测试题,答案,以及解析
  20. 微信不显示通讯录好友

热门文章

  1. day03-事件绑定wxs脚本页面渲染页面事件
  2. BB10 Cascades Hello World样例分析
  3. 2022秋 | PTA编程训练(二)
  4. 探讨:在Mac新系统下安装新版XAMPP过程中遇到到的坑以及解决办法
  5. python小甲鱼安装方法_【Python】easygui小甲鱼
  6. 单片机快速入门:12864液晶屏实现多功能数字时钟
  7. 2020最全微服务:SpringBoot+Cloud+Docker
  8. 个人收集的java精品网站
  9. 遥感影像正射矫正及高分二号遥感影像获取
  10. ncurses.h 库安装及函数使用