成品插件下载地址:https://download.csdn.net/download/wyazyf/11286050

一、下载

1、下载kettle8.2的发布版源码包,具体版本为8.2,地址:https://github.com/pentaho/pentaho-kettle/releases?after=7.1.0.23-R。具体位置如下图

2、下载kettle8.2发布版工具,地址:https://community.hitachivantara.com/docs/DOC-1009855,具体位置如下图

二、修改

1、解压源码包,名称为pentaho-kettle-8.2.0.0-R

2、进入pentaho-kettle-8.2.0.0-R目录,修改pom文件内容。(解决编译时部分包找不到问题)

pentaho-public

Pentaho Public

https://nexus.pentaho.org/repository/omni/

true

daily

true

interval:15

pentaho-public1

/

http://oss.sonatype.org/content/groups/public/

pentaho-public2

/

https://nexus.pentaho.org/repository/proxy-public-release/

pentaho-public3

/

https://nexus.pentaho.org/repository/proxy-public-snapshot/

3、进入pentaho-kettle-8.2.0.0-R\plugins目录,修改pom文件内容。(加快编译速度)

highdeps

!skipDefault

elasticsearch-bulk-insert

4、进入C:\Users\DELL\Desktop\pentaho-kettle-8.2.0.0-R\plugins\elasticsearch-bulk-insert\core目录,修改pom文件内容。(添加es7连接客户端,注释transport连接客户端,修改es的版本号)

8.2.0.0-342

${project.version}

${maven.build.timestamp}

${project.description}

yyyy/MM/dd hh:mm

7.2.0

org.elasticsearch.client

elasticsearch-rest-high-level-client

7.2.0

org.elasticsearch

elasticsearch

${elasticsearch.version}

compile

5、进入C:\Users\DELL\Desktop\pentaho-kettle-8.2.0.0-R\plugins\elasticsearch-bulk-insert\core\src\main\java\org\pentaho\di\trans\steps\elasticsearchbulk目录,修改ElasticSearchBulk.java文件内容,如下

package org.pentaho.di.trans.steps.elasticsearchbulk;

import java.io.IOException;

import java.net.UnknownHostException;

import java.util.ArrayList;

import java.util.Date;

import java.util.List;

import java.util.Map;

import java.util.concurrent.TimeUnit;

import org.apache.commons.lang.StringUtils;

import org.apache.http.HttpHost;

import org.elasticsearch.action.DocWriteRequest.OpType;

import org.elasticsearch.action.bulk.BulkItemResponse;

import org.elasticsearch.action.bulk.BulkRequest;

import org.elasticsearch.action.bulk.BulkResponse;

import org.elasticsearch.action.index.IndexRequest;

import org.elasticsearch.client.RequestOptions;

import org.elasticsearch.client.RestClient;

import org.elasticsearch.client.RestHighLevelClient;

import org.elasticsearch.client.transport.NoNodeAvailableException;

import org.elasticsearch.common.settings.Settings;

import org.elasticsearch.common.xcontent.XContentBuilder;

import org.elasticsearch.common.xcontent.XContentFactory;

import org.elasticsearch.common.xcontent.XContentType;

import org.pentaho.di.core.exception.KettleException;

import org.pentaho.di.core.exception.KettleStepException;

import org.pentaho.di.core.row.RowDataUtil;

import org.pentaho.di.core.row.RowMetaInterface;

import org.pentaho.di.core.row.ValueMetaInterface;

import org.pentaho.di.i18n.BaseMessages;

import org.pentaho.di.trans.Trans;

import org.pentaho.di.trans.TransMeta;

import org.pentaho.di.trans.step.BaseStep;

import org.pentaho.di.trans.step.StepDataInterface;

import org.pentaho.di.trans.step.StepInterface;

import org.pentaho.di.trans.step.StepMeta;

import org.pentaho.di.trans.step.StepMetaInterface;

import org.pentaho.di.trans.steps.elasticsearchbulk.ElasticSearchBulkMeta.Server;

/**

* Does bulk insert of data into ElasticSearch

*

* @author webdetails

* @since 16-02-2011

*/

public class ElasticSearchBulk extends BaseStep implements StepInterface {

private static final String INSERT_ERROR_CODE = null;

private static Class> PKG = ElasticSearchBulkMeta.class; // for i18n

private ElasticSearchBulkMeta meta;

private ElasticSearchBulkData data;

//  private Client client;

private RestHighLevelClient client;

private String index;

private String type;

//  BulkRequestBuilder currentRequest;

BulkRequest currentRequest = new BulkRequest();

private int batchSize = 2;

private boolean isJsonInsert = false;

private int jsonFieldIdx = 0;

private String idOutFieldName = null;

private Integer idFieldIndex = null;

private Long timeout = null;

private TimeUnit timeoutUnit = TimeUnit.MILLISECONDS;

private int numberOfErrors = 0;

//  private ListrequestsBuffer;

private ListrequestsBuffer;

private boolean stopOnError = true;

private boolean useOutput = true;

private MapcolumnsToJson;

private boolean hasFields;

private IndexRequest.OpType opType = org.elasticsearch.action.DocWriteRequest.OpType.CREATE;

public ElasticSearchBulk( StepMeta stepMeta, StepDataInterface stepDataInterface, int copyNr, TransMeta transMeta,

Trans trans ) {

super( stepMeta, stepDataInterface, copyNr, transMeta, trans );

}

public boolean processRow( StepMetaInterface smi, StepDataInterface sdi ) throws KettleException {

Object[] rowData = getRow();

if ( rowData == null ) {

if ( currentRequest != null && currentRequest.numberOfActions() > 0 ) {

processBatch( false );

}

setOutputDone();

return false;

}

if ( first ) {

first = false;

setupData();

//      currentRequest = client.prepareBulk();

//      requestsBuffer = new ArrayList( this.batchSize );

//   try {

//            client.bulk(currentRequest, RequestOptions.DEFAULT);

// } catch (IOException e1) {

//     rejectAllRows( e1.getLocalizedMessage() );

//       String msg = BaseMessages.getString( PKG, "ElasticSearchBulk.Log.Exception", e1.getLocalizedMessage() );

//       logError( msg );

//       throw new KettleStepException( msg, e1 );

// }

requestsBuffer = new ArrayList( this.batchSize );

initFieldIndexes();

}

try {

data.inputRowBuffer[data.nextBufferRowIdx++] = rowData;

return indexRow( data.inputRowMeta, rowData ) || !stopOnError;

} catch ( KettleStepException e ) {

throw e;

} catch ( Exception e ) {

rejectAllRows( e.getLocalizedMessage() );

String msg = BaseMessages.getString( PKG, "ElasticSearchBulk.Log.Exception", e.getLocalizedMessage() );

logError( msg );

throw new KettleStepException( msg, e );

}

}

/**

* Initialize this.data

*

* @throws KettleStepException

*/

private void setupData() throws KettleStepException {

data.nextBufferRowIdx = 0;

data.inputRowMeta = getInputRowMeta().clone(); // only available after first getRow();

data.inputRowBuffer = new Object[batchSize][];

data.outputRowMeta = data.inputRowMeta.clone();

meta.getFields( data.outputRowMeta, getStepname(), null, null, this, repository, metaStore );

}

private void initFieldIndexes() throws KettleStepException {

if ( isJsonInsert ) {

Integer idx = getFieldIdx( data.inputRowMeta, environmentSubstitute( meta.getJsonField() ) );

if ( idx != null ) {

jsonFieldIdx = idx.intValue();

} else {

throw new KettleStepException( BaseMessages.getString( PKG, "ElasticSearchBulk.Error.NoJsonField" ) );

}

}

idOutFieldName = environmentSubstitute( meta.getIdOutField() );

if ( StringUtils.isNotBlank( meta.getIdInField() ) ) {

idFieldIndex = getFieldIdx( data.inputRowMeta, environmentSubstitute( meta.getIdInField() ) );

if ( idFieldIndex == null ) {

throw new KettleStepException( BaseMessages.getString( PKG, "ElasticSearchBulk.Error.InvalidIdField" ) );

}

} else {

idFieldIndex = null;

}

}

private static Integer getFieldIdx( RowMetaInterface rowMeta, String fieldName ) {

if ( fieldName == null ) {

return null;

}

for ( int i = 0; i < rowMeta.size(); i++ ) {

String name = rowMeta.getValueMeta( i ).getName();

if ( fieldName.equals( name ) ) {

return i;

}

}

return null;

}

/**

* @param rowMeta The metadata for the row to be indexed

* @param row     The data for the row to be indexed

*/

private boolean indexRow( RowMetaInterface rowMeta, Object[] row ) throws KettleStepException {

try {

//      IndexRequestBuilder requestBuilder = client.prepareIndex( index, type );

//      requestBuilder.setOpType( this.opType );

IndexRequest indexRequest = new IndexRequest(index);

indexRequest.type(type);

indexRequest.opType(this.opType);

if ( idFieldIndex != null ) {

//        requestBuilder.setId( "" + row[idFieldIndex] ); // "" just in case field isn't string

indexRequest.id("" + row[idFieldIndex]);

}

if ( isJsonInsert ) {

//        addSourceFromJsonString( row, requestBuilder );

addSourceFromJsonString( row, indexRequest );

} else {

//        addSourceFromRowFields( requestBuilder, rowMeta, row );

addSourceFromRowFields( indexRequest, rowMeta, row );

}

// currentRequest = new BulkRequest();

//      currentRequest.add( requestBuilder );

//      requestsBuffer.add( requestBuilder );

currentRequest.add( indexRequest );

requestsBuffer.add( indexRequest );

if ( currentRequest.numberOfActions() >= batchSize ) {

return processBatch( true );

} else {

return true;

}

} catch ( KettleStepException e ) {

throw e;

} catch ( NoNodeAvailableException e ) {

throw new KettleStepException( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.Error.NoNodesFound" ) );

} catch ( Exception e ) {

throw new KettleStepException( BaseMessages.getString( PKG, "ElasticSearchBulk.Log.Exception", e

.getLocalizedMessage() ), e );

}

}

//  /**

//   * @param row

//   * @param requestBuilder

//   */

//  private void addSourceFromJsonString( Object[] row, IndexRequestBuilder requestBuilder ) throws KettleStepException {

//    Object jsonString = row[jsonFieldIdx];

//    if ( jsonString instanceof byte[] ) {

//      requestBuilder.setSource( (byte[]) jsonString, XContentType.JSON );

//    } else if ( jsonString instanceof String ) {

//      requestBuilder.setSource( (String) jsonString, XContentType.JSON );

//    } else {

//      throw new KettleStepException( BaseMessages.getString( "ElasticSearchBulk.Error.NoJsonFieldFormat" ) );

//    }

//  }

/**

* @param row

* @param IndexRequest

*/

private void addSourceFromJsonString( Object[] row, IndexRequest indexRequest ) throws KettleStepException {

Object jsonString = row[jsonFieldIdx];

if ( jsonString instanceof byte[] ) {

indexRequest.source( (byte[]) jsonString, XContentType.JSON );

} else if ( jsonString instanceof String ) {

indexRequest.source( (String) jsonString, XContentType.JSON );

} else {

throw new KettleStepException( BaseMessages.getString( "ElasticSearchBulk.Error.NoJsonFieldFormat" ) );

}

}

//  /**

//   * @param requestBuilder

//   * @param rowMeta

//   * @param row

//   * @throws IOException

//   */

//  private void addSourceFromRowFields( IndexRequestBuilder requestBuilder, RowMetaInterface rowMeta, Object[] row )

//          throws IOException {

//    XContentBuilder jsonBuilder = XContentFactory.jsonBuilder().startObject();

//

//    for ( int i = 0; i < rowMeta.size(); i++ ) {

//      if ( idFieldIndex != null && i == idFieldIndex ) { // skip id

//        continue;

//      }

//

//      ValueMetaInterface valueMeta = rowMeta.getValueMeta( i );

//      String name = hasFields ? columnsToJson.get( valueMeta.getName() ) : valueMeta.getName();

//      Object value = row[i];

//      if ( value instanceof Date && value.getClass() != Date.class ) {

//        Date subDate = (Date) value;

//        // create a genuine Date object, or jsonBuilder will not recognize it

//        value = new Date( subDate.getTime() );

//      }

//      if ( StringUtils.isNotBlank( name ) ) {

//        jsonBuilder.field( name, value );

//      }

//    }

//

//    jsonBuilder.endObject();

//    requestBuilder.setSource( jsonBuilder );

//  }

/**

* @param requestBuilder

* @param rowMeta

* @param row

* @throws IOException

*/

private void addSourceFromRowFields( IndexRequest indexRequest, RowMetaInterface rowMeta, Object[] row )

throws IOException {

XContentBuilder jsonBuilder = XContentFactory.jsonBuilder().startObject();

for ( int i = 0; i < rowMeta.size(); i++ ) {

if ( idFieldIndex != null && i == idFieldIndex ) { // skip id

continue;

}

ValueMetaInterface valueMeta = rowMeta.getValueMeta( i );

String name = hasFields ? columnsToJson.get( valueMeta.getName() ) : valueMeta.getName();

Object value = row[i];

if ( value instanceof Date && value.getClass() != Date.class ) {

Date subDate = (Date) value;

// create a genuine Date object, or jsonBuilder will not recognize it

value = new Date( subDate.getTime() );

}

if ( StringUtils.isNotBlank( name ) ) {

jsonBuilder.field( name, value );

}

}

jsonBuilder.endObject();

indexRequest.source( jsonBuilder );

}

public boolean init( StepMetaInterface smi, StepDataInterface sdi ) {

meta = (ElasticSearchBulkMeta) smi;

data = (ElasticSearchBulkData) sdi;

if ( super.init( smi, sdi ) ) {

try {

numberOfErrors = 0;

initFromMeta();

initClient();

return true;

} catch ( Exception e ) {

logError( BaseMessages.getString( PKG, "ElasticSearchBulk.Log.ErrorOccurredDuringStepInitialize" )

+ e.getMessage() );

}

return true;

}

return false;

}

private void initFromMeta() {

index = environmentSubstitute( meta.getIndex() );

type = environmentSubstitute( meta.getType() );

batchSize = meta.getBatchSizeInt( this );

try {

timeout = Long.parseLong( environmentSubstitute( meta.getTimeOut() ) );

} catch ( NumberFormatException e ) {

timeout = null;

}

timeoutUnit = meta.getTimeoutUnit();

isJsonInsert = meta.isJsonInsert();

useOutput = meta.isUseOutput();

stopOnError = meta.isStopOnError();

columnsToJson = meta.getFieldsMap();

this.hasFields = columnsToJson.size() > 0;

this.opType =

StringUtils.isNotBlank( meta.getIdInField() ) && meta.isOverWriteIfSameId() ? OpType.INDEX : OpType.CREATE;

}

private boolean processBatch( boolean makeNew ) throws KettleStepException {

BulkResponse response = null;

//    ActionFutureactionFuture = currentRequest.execute();

try{

response = client.bulk(currentRequest, RequestOptions.DEFAULT);

} catch (IOException e1) {

rejectAllRows( e1.getLocalizedMessage() );

String msg = BaseMessages.getString( PKG, "ElasticSearchBulk.Log.Exception", e1.getLocalizedMessage() );

logError( msg );

throw new KettleStepException( msg, e1 );

}

boolean responseOk = false;

//    try {

//      if ( timeout != null && timeoutUnit != null ) {

//        response = actionFuture.actionGet( timeout, timeoutUnit );

//      } else {

//        response = actionFuture.actionGet();

//      }

//    } catch ( ElasticsearchException e ) {

//      String msg = BaseMessages.getString( PKG, "ElasticSearchBulk.Error.BatchExecuteFail", e.getLocalizedMessage() );

//      if ( e instanceof ElasticsearchTimeoutException ) {

//        msg = BaseMessages.getString( PKG, "ElasticSearchBulk.Error.Timeout" );

//      }

//      logError( msg );

//      rejectAllRows( msg );

//    }

if ( response != null ) {

responseOk = handleResponse( response );

requestsBuffer.clear();

} else { // have to assume all failed

numberOfErrors += currentRequest.numberOfActions();

setErrors( numberOfErrors );

}

if ( makeNew ) {

//      currentRequest = client.prepareBulk();

try{

client.bulk(currentRequest, RequestOptions.DEFAULT);

} catch (IOException e1) {

rejectAllRows( e1.getLocalizedMessage() );

String msg = BaseMessages.getString( PKG, "ElasticSearchBulk.Log.Exception", e1.getLocalizedMessage() );

logError( msg );

throw new KettleStepException( msg, e1 );

}

data.nextBufferRowIdx = 0;

data.inputRowBuffer = new Object[batchSize][];

} else {

currentRequest = null;

data.inputRowBuffer = null;

}

return responseOk;

}

/**

* @param response

* @return true if no errors

*/

private boolean handleResponse( BulkResponse response ) {

boolean hasErrors = response.hasFailures();

if ( hasErrors ) {

logError( response.buildFailureMessage() );

}

int errorsInBatch = 0;

if ( hasErrors || useOutput ) {

for ( BulkItemResponse item : response ) {

if ( item.isFailed() ) {

// log

logDetailed( item.getFailureMessage() );

errorsInBatch++;

if ( getStepMeta().isDoingErrorHandling() ) {

rejectRow( item.getItemId(), item.getFailureMessage() );

}

} else if ( useOutput ) {

if ( idOutFieldName != null ) {

addIdToRow( item.getId(), item.getItemId() );

}

echoRow( item.getItemId() );

}

}

}

numberOfErrors += errorsInBatch;

setErrors( numberOfErrors );

int linesOK = currentRequest.numberOfActions() - errorsInBatch;

if ( useOutput ) {

setLinesOutput( getLinesOutput() + linesOK );

} else {

setLinesWritten( getLinesWritten() + linesOK );

}

return !hasErrors;

}

private void addIdToRow( String id, int rowIndex ) {

data.inputRowBuffer[rowIndex] =

RowDataUtil.resizeArray( data.inputRowBuffer[rowIndex], getInputRowMeta().size() + 1 );

data.inputRowBuffer[rowIndex][getInputRowMeta().size()] = id;

}

/**

* Send input row to output

*

* @param rowIndex

*/

private void echoRow( int rowIndex ) {

try {

putRow( data.outputRowMeta, data.inputRowBuffer[rowIndex] );

} catch ( KettleStepException e ) {

logError( e.getLocalizedMessage() );

} catch ( ArrayIndexOutOfBoundsException e ) {

logError( e.getLocalizedMessage() );

}

}

/**

* Send input row to error.

*

* @param index

* @param errorMsg

*/

private void rejectRow( int index, String errorMsg ) {

try {

putError( getInputRowMeta(), data.inputRowBuffer[index], 1, errorMsg, null, INSERT_ERROR_CODE );

} catch ( KettleStepException e ) {

logError( e.getLocalizedMessage() );

} catch ( ArrayIndexOutOfBoundsException e ) {

logError( e.getLocalizedMessage() );

}

}

private void rejectAllRows( String errorMsg ) {

for ( int i = 0; i < data.nextBufferRowIdx; i++ ) {

rejectRow( i, errorMsg );

}

}

private void initClient() throws UnknownHostException {

Settings.Builder settingsBuilder = Settings.builder();

settingsBuilder.put( Settings.Builder.EMPTY_SETTINGS );

meta.getSettingsMap().entrySet().stream().forEach( ( s ) -> settingsBuilder.put( s.getKey(),

environmentSubstitute( s.getValue() ) ) );

//    PreBuiltTransportClient tClient = new PreBuiltTransportClient( settingsBuilder.build() );

//

//    for ( Server server : meta.getServers() ) {

//      tClient.addTransportAddress( new TransportAddress(

//              InetAddress.getByName( environmentSubstitute( server.getAddress() ) ),

//              server.getPort() ) );

//    }

//

//    client = tClient;

RestHighLevelClient rclient=null;

for ( Server server : meta.getServers() ) {

rclient = new RestHighLevelClient(RestClient.builder(new HttpHost(server.getAddress(), Integer.valueOf(server.getPort()), "http")));

}

client = rclient;

/** With the upgrade to elasticsearch 6.3.0, removed the NodeBuilder,

*  which was removed from the elasticsearch 5.0 API, see:

*  https://www.elastic.co/guide/en/elasticsearch/reference/5.0/breaking_50_java_api_changes

*  .html#_nodebuilder_removed

*/

}

private void disposeClient() throws IOException{

if ( client != null ) {

client.close();

}

}

public void dispose( StepMetaInterface smi, StepDataInterface sdi ) {

meta = (ElasticSearchBulkMeta) smi;

data = (ElasticSearchBulkData) sdi;

try {

disposeClient();

} catch ( Exception e ) {

logError( e.getLocalizedMessage(), e );

}

super.dispose( smi, sdi );

}

}

6、进入C:\Users\DELL\Desktop\pentaho-kettle-8.2.0.0-R\plugins\elasticsearch-bulk-insert\core\src\main\java\org\pentaho\di\ui\trans\steps\elasticsearchbulk目录,修改ElasticSearchBulkDialog.java文件内容,如下

package org.pentaho.di.ui.trans.steps.elasticsearchbulk;

import java.util.Map;

import org.apache.commons.lang.StringUtils;

import org.apache.http.HttpHost;

import org.eclipse.swt.SWT;

import org.eclipse.swt.custom.CTabFolder;

import org.eclipse.swt.custom.CTabItem;

import org.eclipse.swt.events.FocusListener;

import org.eclipse.swt.events.ModifyEvent;

import org.eclipse.swt.events.ModifyListener;

import org.eclipse.swt.events.SelectionAdapter;

import org.eclipse.swt.events.SelectionEvent;

import org.eclipse.swt.events.SelectionListener;

import org.eclipse.swt.events.ShellAdapter;

import org.eclipse.swt.events.ShellEvent;

import org.eclipse.swt.layout.FormAttachment;

import org.eclipse.swt.layout.FormData;

import org.eclipse.swt.layout.FormLayout;

import org.eclipse.swt.widgets.Button;

import org.eclipse.swt.widgets.Composite;

import org.eclipse.swt.widgets.Control;

import org.eclipse.swt.widgets.Display;

import org.eclipse.swt.widgets.Event;

import org.eclipse.swt.widgets.Group;

import org.eclipse.swt.widgets.Label;

import org.eclipse.swt.widgets.Listener;

import org.eclipse.swt.widgets.MessageBox;

import org.eclipse.swt.widgets.Shell;

import org.eclipse.swt.widgets.Text;

import org.elasticsearch.action.admin.indices.get.GetIndexRequest;

import org.elasticsearch.client.RequestOptions;

import org.elasticsearch.client.RestClient;

import org.elasticsearch.client.RestHighLevelClient;

import org.elasticsearch.client.core.MainResponse;

import org.elasticsearch.client.transport.NoNodeAvailableException;

import org.elasticsearch.common.settings.Settings;

import org.elasticsearch.discovery.MasterNotDiscoveredException;

import org.pentaho.di.core.Const;

import org.pentaho.di.core.Props;

import org.pentaho.di.core.exception.KettleException;

import org.pentaho.di.core.row.RowMetaInterface;

import org.pentaho.di.i18n.BaseMessages;

import org.pentaho.di.trans.TransMeta;

import org.pentaho.di.trans.step.BaseStepMeta;

import org.pentaho.di.trans.step.StepDialogInterface;

import org.pentaho.di.trans.steps.elasticsearchbulk.ElasticSearchBulkMeta;

import org.pentaho.di.trans.steps.elasticsearchbulk.ElasticSearchBulkMeta.Server;

import org.pentaho.di.ui.core.dialog.ErrorDialog;

import org.pentaho.di.ui.core.widget.ColumnInfo;

import org.pentaho.di.ui.core.widget.LabelComboVar;

import org.pentaho.di.ui.core.widget.LabelTextVar;

import org.pentaho.di.ui.core.widget.TableView;

import org.pentaho.di.ui.core.widget.TextVar;

import org.pentaho.di.ui.trans.step.BaseStepDialog;

public class ElasticSearchBulkDialog extends BaseStepDialog implements StepDialogInterface {

private ElasticSearchBulkMeta model;

private static Class> PKG = ElasticSearchBulkMeta.class;

private CTabFolder wTabFolder;

private FormData fdTabFolder;

private CTabItem wGeneralTab;

private Composite wGeneralComp;

private FormData fdGeneralComp;

private Label wlBatchSize;

private TextVar wBatchSize;

private LabelTextVar wIdOutField;

private Group wIndexGroup;

private FormData fdIndexGroup;

private Group wSettingsGroup;

private FormData fdSettingsGroup;

private String[] fieldNames;

private CTabItem wFieldsTab;

private LabelTextVar wIndex;

private LabelTextVar wType;

private ModifyListener lsMod;

private Button wIsJson;

private Label wlIsJson;

private Label wlUseOutput;

private Button wUseOutput;

private LabelComboVar wJsonField;

private TableView wFields;

private CTabItem wServersTab;

private TableView wServers;

private CTabItem wSettingsTab;

private TableView wSettings;

private LabelTimeComposite wTimeOut;

private Label wlStopOnError;

private Button wStopOnError;

private Button wTest;

private Button wTestCl;

private LabelComboVar wIdInField;

private Button wIsOverwrite;

private Label wlIsOverwrite;

public ElasticSearchBulkDialog( Shell parent, Object in, TransMeta transMeta, String sname ) {

super( parent, (BaseStepMeta) in, transMeta, sname );

model = (ElasticSearchBulkMeta) in;

}

public String open() {

Shell parent = getParent();

Display display = parent.getDisplay();

shell = new Shell( parent, SWT.DIALOG_TRIM | SWT.RESIZE | SWT.MAX | SWT.MIN );

props.setLook( shell );

setShellImage( shell, model );

lsMod = new ModifyListener() {

public void modifyText( ModifyEvent e ) {

model.setChanged();

}

};

changed = model.hasChanged();

FormLayout formLayout = new FormLayout();

formLayout.marginWidth = Const.FORM_MARGIN;

formLayout.marginHeight = Const.FORM_MARGIN;

shell.setLayout( formLayout );

shell.setText( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.DialogTitle" ) );

int middle = props.getMiddlePct();

int margin = Const.MARGIN;

// Stepname line

wlStepname = new Label( shell, SWT.RIGHT );

wlStepname.setText( BaseMessages.getString( PKG, "System.Label.StepName" ) );

props.setLook( wlStepname );

fdlStepname = new FormData();

fdlStepname.left = new FormAttachment( 0, 0 );

fdlStepname.top = new FormAttachment( 0, margin );

fdlStepname.right = new FormAttachment( middle, -margin );

wlStepname.setLayoutData( fdlStepname );

wStepname = new Text( shell, SWT.SINGLE | SWT.LEFT | SWT.BORDER );

wStepname.setText( stepname );

props.setLook( wStepname );

wStepname.addModifyListener( lsMod );

fdStepname = new FormData();

fdStepname.left = new FormAttachment( middle, 0 );

fdStepname.top = new FormAttachment( 0, margin );

fdStepname.right = new FormAttachment( 100, 0 );

wStepname.setLayoutData( fdStepname );

wTabFolder = new CTabFolder( shell, SWT.BORDER );

props.setLook( wTabFolder, Props.WIDGET_STYLE_TAB );

// GENERAL TAB

addGeneralTab();

// Servers TAB

addServersTab();

// Fields TAB

addFieldsTab();

// Settings TAB

addSettingsTab();

//

// BUTTONS //

// //

wOK = new Button( shell, SWT.PUSH );

wOK.setText( BaseMessages.getString( PKG, "System.Button.OK" ) );

wCancel = new Button( shell, SWT.PUSH );

wCancel.setText( BaseMessages.getString( PKG, "System.Button.Cancel" ) );

setButtonPositions( new Button[]{wOK, wCancel}, margin, null );

fdTabFolder = new FormData();

fdTabFolder.left = new FormAttachment( 0, 0 );

fdTabFolder.top = new FormAttachment( wStepname, margin );

fdTabFolder.right = new FormAttachment( 100, 0 );

fdTabFolder.bottom = new FormAttachment( wOK, -margin );

wTabFolder.setLayoutData( fdTabFolder );

// //

// Std Listeners //

//

addStandardListeners();

wTabFolder.setSelection( 0 );

// Set the shell size, based upon previous time...

setSize();

getData( model );

model.setChanged( changed );

shell.open();

while ( !shell.isDisposed() ) {

if ( !display.readAndDispatch() ) {

display.sleep();

}

}

return stepname;

}

private void addStandardListeners() {

// Add listeners

lsOK = new Listener() {

public void handleEvent( Event e ) {

ok();

}

};

lsCancel = new Listener() {

public void handleEvent( Event e ) {

cancel();

}

};

lsMod = new ModifyListener() {

public void modifyText( ModifyEvent event ) {

model.setChanged();

}

};

wOK.addListener( SWT.Selection, lsOK );

wCancel.addListener( SWT.Selection, lsCancel );

lsDef = new SelectionAdapter() {

public void widgetDefaultSelected( SelectionEvent e ) {

ok();

}

};

wStepname.addSelectionListener( lsDef );

// window close

shell.addShellListener( new ShellAdapter() {

public void shellClosed( ShellEvent e ) {

cancel();

}

} );

}

/**

*/

private void addGeneralTab() {

wGeneralTab = new CTabItem( wTabFolder, SWT.NONE );

wGeneralTab.setText( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.General.Tab" ) );

wGeneralComp = new Composite( wTabFolder, SWT.NONE );

props.setLook( wGeneralComp );

FormLayout generalLayout = new FormLayout();

generalLayout.marginWidth = 3;

generalLayout.marginHeight = 3;

wGeneralComp.setLayout( generalLayout );

// Index GROUP

fillIndexGroup( wGeneralComp );

// Options GROUP

fillOptionsGroup( wGeneralComp );

fdGeneralComp = new FormData();

fdGeneralComp.left = new FormAttachment( 0, 0 );

fdGeneralComp.top = new FormAttachment( wStepname, Const.MARGIN );

fdGeneralComp.right = new FormAttachment( 100, 0 );

fdGeneralComp.bottom = new FormAttachment( 100, 0 );

wGeneralComp.setLayoutData( fdGeneralComp );

wGeneralComp.layout();

wGeneralTab.setControl( wGeneralComp );

}

private void fillIndexGroup( Composite parentTab ) {

wIndexGroup = new Group( parentTab, SWT.SHADOW_NONE );

props.setLook( wIndexGroup );

wIndexGroup.setText( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.IndexGroup.Label" ) );

FormLayout indexGroupLayout = new FormLayout();

indexGroupLayout.marginWidth = 10;

indexGroupLayout.marginHeight = 10;

wIndexGroup.setLayout( indexGroupLayout );

// Index

wIndex = new LabelTextVar( transMeta, wIndexGroup, BaseMessages.getString( PKG, "ElasticSearchBulkDialog.Index"

+ ".Label" ), BaseMessages.getString( PKG, "ElasticSearchBulkDialog.Index.Tooltip" ) );

wIndex.addModifyListener( lsMod );

// Type

wType =

new LabelTextVar( transMeta, wIndexGroup, BaseMessages.getString( PKG, "ElasticSearchBulkDialog.Type"

+ ".Label" ),

BaseMessages.getString( PKG, "ElasticSearchBulkDialog.Type.Tooltip" ) );

wType.addModifyListener( lsMod );

// Test button

wTest = new Button( wIndexGroup, SWT.PUSH );

wTest.setText( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.TestIndex.Label" ) );

wTest.setToolTipText( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.TestIndex.Tooltip" ) );

wTest.addListener( SWT.Selection, new Listener() {

public void handleEvent( Event arg0 ) {

test( TestType.INDEX );

}

} );

Control[] connectionControls = new Control[]{wIndex, wType};

placeControls( wIndexGroup, connectionControls );

BaseStepDialog.positionBottomButtons( wIndexGroup, new Button[]{wTest}, Const.MARGIN, wType );

fdIndexGroup = new FormData();

fdIndexGroup.left = new FormAttachment( 0, Const.MARGIN );

fdIndexGroup.top = new FormAttachment( wStepname, Const.MARGIN );

fdIndexGroup.right = new FormAttachment( 100, -Const.MARGIN );

wIndexGroup.setLayoutData( fdIndexGroup );

}

private void addServersTab() {

wServersTab = new CTabItem( wTabFolder, SWT.NONE );

wServersTab.setText( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.ServersTab.TabTitle" ) );

FormLayout serversLayout = new FormLayout();

serversLayout.marginWidth = Const.FORM_MARGIN;

serversLayout.marginHeight = Const.FORM_MARGIN;

Composite wServersComp = new Composite( wTabFolder, SWT.NONE );

wServersComp.setLayout( serversLayout );

props.setLook( wServersComp );

// Test button

wTestCl = new Button( wServersComp, SWT.PUSH );

wTestCl.setText( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.TestCluster.Label" ) );

wTestCl.setToolTipText( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.TestCluster.Tooltip" ) );

wTestCl.addListener( SWT.Selection, new Listener() {

public void handleEvent( Event arg0 ) {

test( TestType.CLUSTER );

}

} );

setButtonPositions( new Button[]{wTestCl}, Const.MARGIN, null );

ColumnInfo[] columnsMeta = new ColumnInfo[2];

columnsMeta[0] =

new ColumnInfo( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.ServersTab.Address.Column" ),

ColumnInfo.COLUMN_TYPE_TEXT, false );

columnsMeta[0].setUsingVariables( true );

columnsMeta[1] =

new ColumnInfo( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.ServersTab.Port.Column" ),

ColumnInfo.COLUMN_TYPE_TEXT, true );

wServers =

new TableView( transMeta, wServersComp, SWT.BORDER | SWT.FULL_SELECTION | SWT.MULTI, columnsMeta, 1, lsMod,

props );

FormData fdServers = new FormData();

fdServers.left = new FormAttachment( 0, Const.MARGIN );

fdServers.top = new FormAttachment( 0, Const.MARGIN );

fdServers.right = new FormAttachment( 100, -Const.MARGIN );

fdServers.bottom = new FormAttachment( wTestCl, -Const.MARGIN );

wServers.setLayoutData( fdServers );

FormData fdServersComp = new FormData();

fdServersComp.left = new FormAttachment( 0, 0 );

fdServersComp.top = new FormAttachment( 0, 0 );

fdServersComp.right = new FormAttachment( 100, 0 );

fdServersComp.bottom = new FormAttachment( 100, 0 );

wServersComp.setLayoutData( fdServersComp );

wServersComp.layout();

wServersTab.setControl( wServersComp );

}

private void addSettingsTab() {

wSettingsTab = new CTabItem( wTabFolder, SWT.NONE );

wSettingsTab.setText( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.SettingsTab.TabTitle" ) );

FormLayout serversLayout = new FormLayout();

serversLayout.marginWidth = Const.FORM_MARGIN;

serversLayout.marginHeight = Const.FORM_MARGIN;

Composite wSettingsComp = new Composite( wTabFolder, SWT.NONE );

wSettingsComp.setLayout( serversLayout );

props.setLook( wSettingsComp );

ColumnInfo[] columnsMeta = new ColumnInfo[2];

columnsMeta[0] =

new ColumnInfo( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.SettingsTab.Property.Column" ),

ColumnInfo.COLUMN_TYPE_TEXT, false );

columnsMeta[1] =

new ColumnInfo( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.SettingsTab.Value.Column" ),

ColumnInfo.COLUMN_TYPE_TEXT, false );

columnsMeta[1].setUsingVariables( true );

wSettings =

new TableView( transMeta, wSettingsComp, SWT.BORDER | SWT.FULL_SELECTION | SWT.MULTI, columnsMeta, 1, lsMod,

props );

FormData fdServers = new FormData();

fdServers.left = new FormAttachment( 0, Const.MARGIN );

fdServers.top = new FormAttachment( 0, Const.MARGIN );

fdServers.right = new FormAttachment( 100, -Const.MARGIN );

fdServers.bottom = new FormAttachment( 100, -Const.MARGIN );

wSettings.setLayoutData( fdServers );

FormData fdServersComp = new FormData();

fdServersComp.left = new FormAttachment( 0, 0 );

fdServersComp.top = new FormAttachment( 0, 0 );

fdServersComp.right = new FormAttachment( 100, 0 );

fdServersComp.bottom = new FormAttachment( 100, 0 );

wSettingsComp.setLayoutData( fdServersComp );

wSettingsComp.layout();

wSettingsTab.setControl( wSettingsComp );

}

private void addFieldsTab() {

wFieldsTab = new CTabItem( wTabFolder, SWT.NONE );

wFieldsTab.setText( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.FieldsTab.TabTitle" ) );

FormLayout fieldsLayout = new FormLayout();

fieldsLayout.marginWidth = Const.FORM_MARGIN;

fieldsLayout.marginHeight = Const.FORM_MARGIN;

Composite wFieldsComp = new Composite( wTabFolder, SWT.NONE );

wFieldsComp.setLayout( fieldsLayout );

props.setLook( wFieldsComp );

wGet = new Button( wFieldsComp, SWT.PUSH );

wGet.setText( BaseMessages.getString( PKG, "System.Button.GetFields" ) );

wGet.setToolTipText( BaseMessages.getString( PKG, "System.Tooltip.GetFields" ) );

lsGet = new Listener() {

public void handleEvent( Event e ) {

getPreviousFields( wFields );

}

};

wGet.addListener( SWT.Selection, lsGet );

setButtonPositions( new Button[]{wGet}, Const.MARGIN, null );

final int fieldsRowCount = model.getFields().size();

String[] names = this.fieldNames != null ? this.fieldNames : new String[]{""};

ColumnInfo[] columnsMeta = new ColumnInfo[2];

columnsMeta[0] =

new ColumnInfo( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.NameColumn.Column" ),

ColumnInfo.COLUMN_TYPE_CCOMBO, names, false );

columnsMeta[1] =

new ColumnInfo( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.TargetNameColumn.Column" ),

ColumnInfo.COLUMN_TYPE_TEXT, false );

wFields =

new TableView( transMeta, wFieldsComp, SWT.BORDER | SWT.FULL_SELECTION | SWT.MULTI, columnsMeta,

fieldsRowCount,

lsMod, props );

FormData fdFields = new FormData();

fdFields.left = new FormAttachment( 0, Const.MARGIN );

fdFields.top = new FormAttachment( 0, Const.MARGIN );

fdFields.right = new FormAttachment( 100, -Const.MARGIN );

fdFields.bottom = new FormAttachment( wGet, -Const.MARGIN );

wFields.setLayoutData( fdFields );

FormData fdFieldsComp = new FormData();

fdFieldsComp.left = new FormAttachment( 0, 0 );

fdFieldsComp.top = new FormAttachment( 0, 0 );

fdFieldsComp.right = new FormAttachment( 100, 0 );

fdFieldsComp.bottom = new FormAttachment( 100, 0 );

wFieldsComp.setLayoutData( fdFieldsComp );

wFieldsComp.layout();

wFieldsTab.setControl( wFieldsComp );

}

private void fillOptionsGroup( Composite parentTab ) {

int margin = Const.MARGIN;

wSettingsGroup = new Group( parentTab, SWT.SHADOW_NONE );

props.setLook( wSettingsGroup );

wSettingsGroup.setText( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.SettingsGroup.Label" ) );

FormLayout settingGroupLayout = new FormLayout();

settingGroupLayout.marginWidth = 10;

settingGroupLayout.marginHeight = 10;

wSettingsGroup.setLayout( settingGroupLayout );

// Timeout

wTimeOut =

new LabelTimeComposite( wSettingsGroup, BaseMessages.getString( PKG, "ElasticSearchBulkDialog.TimeOut"

+ ".Label" ),

BaseMessages.getString( PKG, "ElasticSearchBulkDialog.TimeOut.Tooltip" ) );

props.setLook( wTimeOut );

wTimeOut.addModifyListener( lsMod );

// BatchSize

wlBatchSize = new Label( wSettingsGroup, SWT.RIGHT );

wlBatchSize.setText( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.BatchSize.Label" ) );

props.setLook( wlBatchSize );

wBatchSize = new TextVar( transMeta, wSettingsGroup, SWT.SINGLE | SWT.LEFT | SWT.BORDER );

props.setLook( wBatchSize );

wBatchSize.addModifyListener( lsMod );

// Stop on error

wlStopOnError = new Label( wSettingsGroup, SWT.RIGHT );

wlStopOnError.setText( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.StopOnError.Label" ) );

wStopOnError = new Button( wSettingsGroup, SWT.CHECK | SWT.RIGHT );

wStopOnError.setToolTipText( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.StopOnError.Tooltip" ) );

wStopOnError.addSelectionListener( new SelectionListener() {

public void widgetDefaultSelected( SelectionEvent arg0 ) {

widgetSelected( arg0 );

}

public void widgetSelected( SelectionEvent arg0 ) {

model.setChanged();

}

} );

// ID input

wIdInField =

new LabelComboVar( transMeta, wSettingsGroup, BaseMessages.getString( PKG,

"ElasticSearchBulkDialog.IdField.Label" ), BaseMessages.getString( PKG,

"ElasticSearchBulkDialog.IdField.Tooltip" ) );

props.setLook( wIdInField );

wIdInField.getComboWidget().setEditable( true );

wIdInField.addModifyListener( lsMod );

wIdInField.addFocusListener( new FocusListener() {

public void focusLost( org.eclipse.swt.events.FocusEvent e ) {

}

public void focusGained( org.eclipse.swt.events.FocusEvent e ) {

getPreviousFields( wIdInField );

}

} );

getPreviousFields( wIdInField );

wlIsOverwrite = new Label( wSettingsGroup, SWT.RIGHT );

wlIsOverwrite.setText( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.Overwrite.Label" ) );

wIsOverwrite = new Button( wSettingsGroup, SWT.CHECK | SWT.RIGHT );

wIsOverwrite.setToolTipText( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.Overwrite.Tooltip" ) );

wIsOverwrite.addSelectionListener( new SelectionListener() {

public void widgetDefaultSelected( SelectionEvent arg0 ) {

widgetSelected( arg0 );

}

public void widgetSelected( SelectionEvent arg0 ) {

model.setChanged();

}

} );

// Output rows

wlUseOutput = new Label( wSettingsGroup, SWT.RIGHT );

wlUseOutput.setText( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.UseOutput.Label" ) );

wUseOutput = new Button( wSettingsGroup, SWT.CHECK | SWT.RIGHT );

wUseOutput.setToolTipText( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.UseOutput.Tooltip" ) );

wUseOutput.addSelectionListener( new SelectionListener() {

public void widgetDefaultSelected( SelectionEvent arg0 ) {

widgetSelected( arg0 );

}

public void widgetSelected( SelectionEvent arg0 ) {

wIdOutField.setEnabled( wUseOutput.getSelection() );

model.setChanged();

}

} );

// ID out field

wIdOutField =

new LabelTextVar( transMeta, wSettingsGroup, BaseMessages.getString( PKG,

"ElasticSearchBulkDialog.IdOutField.Label" ), BaseMessages.getString( PKG,

"ElasticSearchBulkDialog.IdOutField.Tooltip" ) );

props.setLook( wIdOutField );

wIdOutField.setEnabled( wUseOutput.getSelection() );

wIdOutField.addModifyListener( lsMod );

// use json

wlIsJson = new Label( wSettingsGroup, SWT.RIGHT );

wlIsJson.setText( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.IsJson.Label" ) );

wIsJson = new Button( wSettingsGroup, SWT.CHECK | SWT.RIGHT );

wIsJson.setToolTipText( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.IsJson.Tooltip" ) );

wIsJson.addSelectionListener( new SelectionListener() {

public void widgetDefaultSelected( SelectionEvent arg0 ) {

widgetSelected( arg0 );

}

public void widgetSelected( SelectionEvent arg0 ) {

wJsonField.setEnabled( wIsJson.getSelection() );

wFields.setEnabled( !wIsJson.getSelection() );

wFields.setVisible( !wIsJson.getSelection() );

wGet.setEnabled( !wIsJson.getSelection() );

model.setChanged();

}

} );

// Json field

wJsonField =

new LabelComboVar( transMeta, wSettingsGroup, BaseMessages.getString( PKG,

"ElasticSearchBulkDialog.JsonField.Label" ), BaseMessages.getString( PKG,

"ElasticSearchBulkDialog.JsonField.Tooltip" ) );

wJsonField.getComboWidget().setEditable( true );

props.setLook( wJsonField );

wJsonField.addModifyListener( lsMod );

wJsonField.addFocusListener( new FocusListener() {

public void focusLost( org.eclipse.swt.events.FocusEvent e ) {

}

public void focusGained( org.eclipse.swt.events.FocusEvent e ) {

getPreviousFields( wJsonField );

}

} );

getPreviousFields( wJsonField );

wJsonField.setEnabled( wIsJson.getSelection() );

Control[] settingsControls = new Control[]{wlBatchSize, wBatchSize, wlStopOnError, wStopOnError, wTimeOut,

wIdInField, wlIsOverwrite, wIsOverwrite, wlUseOutput, wUseOutput, wIdOutField, wlIsJson, wIsJson,

wJsonField};

placeControls( wSettingsGroup, settingsControls );

fdSettingsGroup = new FormData();

fdSettingsGroup.left = new FormAttachment( 0, margin );

fdSettingsGroup.top = new FormAttachment( wIndexGroup, margin );

fdSettingsGroup.right = new FormAttachment( 100, -margin );

wSettingsGroup.setLayoutData( fdSettingsGroup );

}

private void getPreviousFields( LabelComboVar combo ) {

String value = combo.getText();

combo.removeAll();

combo.setItems( getInputFieldNames() );

if ( value != null ) {

combo.setText( value );

}

}

private String[] getInputFieldNames() {

if ( this.fieldNames == null ) {

try {

RowMetaInterface r = transMeta.getPrevStepFields( stepname );

if ( r != null ) {

fieldNames = r.getFieldNames();

}

} catch ( KettleException ke ) {

new ErrorDialog( shell, BaseMessages.getString( PKG, "ElasticSearchBulkDialog.FailedToGetFields.DialogTitle" ),

BaseMessages.getString( PKG, "ElasticSearchBulkDialog.FailedToGetFields.DialogMessage" ), ke );

return new String[0];

}

}

return fieldNames;

}

private void getPreviousFields( TableView table ) {

try {

RowMetaInterface r = transMeta.getPrevStepFields( stepname );

if ( r != null ) {

BaseStepDialog.getFieldsFromPrevious( r, table, 1, new int[]{1, 2}, null, 0, 0, null );

}

} catch ( KettleException ke ) {

new ErrorDialog( shell, BaseMessages.getString( PKG, "System.Dialog.GetFieldsFailed.Title" ), BaseMessages

.getString( PKG, "System.Dialog.GetFieldsFailed.Message" ), ke );

}

}

private void placeControls( Group group, Control[] controls ) {

Control previousAbove = group;

Control previousLeft = group;

for ( Control control : controls ) {

if ( control instanceof Label ) {

addLabelAfter( control, previousAbove );

previousLeft = control;

} else {

addWidgetAfter( control, previousAbove, previousLeft );

previousAbove = control;

previousLeft = group;

}

}

}

private void addWidgetAfter( Control widget, Control widgetAbove, Control widgetLeft ) {

props.setLook( widget );

FormData fData = new FormData();

fData.left = new FormAttachment( widgetLeft, Const.MARGIN );

fData.top = new FormAttachment( widgetAbove, Const.MARGIN );

fData.right = new FormAttachment( 100, -Const.MARGIN );

widget.setLayoutData( fData );

}

private void addLabelAfter( Control widget, Control widgetAbove ) {

props.setLook( widget );

FormData fData = new FormData();

fData.top = new FormAttachment( widgetAbove, Const.MARGIN );

fData.right = new FormAttachment( Const.MIDDLE_PCT, -Const.MARGIN );

widget.setLayoutData( fData );

}

/**

* Read the data from the ElasticSearchBulkMeta object and show it in this dialog.

*

* @param in The ElasticSearchBulkMeta object to obtain the data from.

*/

public void getData( ElasticSearchBulkMeta in ) {

wIndex.setText( Const.NVL( in.getIndex(), "" ) );

wType.setText( Const.NVL( in.getType(), "" ) );

wBatchSize.setText( Const.NVL( in.getBatchSize(), "" + ElasticSearchBulkMeta.DEFAULT_BATCH_SIZE ) );

wStopOnError.setSelection( in.isStopOnError() );

wTimeOut.setText( Const.NVL( in.getTimeOut(), "" ) );

wTimeOut.setTimeUnit( in.getTimeoutUnit() );

wIdInField.setText( Const.NVL( in.getIdInField(), "" ) );

wIsOverwrite.setSelection( in.isOverWriteIfSameId() );

wIsJson.setSelection( in.isJsonInsert() );

wJsonField.setText( Const.NVL( in.getJsonField(), "" ) );

wJsonField.setEnabled( wIsJson.getSelection() ); // listener not working here

wUseOutput.setSelection( in.isUseOutput() );

wIdOutField.setText( Const.NVL( in.getIdOutField(), "" ) );

wIdOutField.setEnabled( wUseOutput.getSelection() ); // listener not working here

// Fields

mapToTableView( model.getFieldsMap(), wFields );

// Servers

for ( ElasticSearchBulkMeta.Server server : model.getServers() ) {

wServers.add( server.address, "" + server.port );

}

wServers.removeEmptyRows();

wServers.setRowNums();

// Settings

mapToTableView( model.getSettingsMap(), wSettings );

wStepname.selectAll();

wStepname.setFocus();

}

private void mapToTableView( Mapmap, TableView table ) {

for ( String key : map.keySet() ) {

table.add( key, map.get( key ) );

}

table.removeEmptyRows();

table.setRowNums();

}

private void cancel() {

stepname = null;

model.setChanged( changed );

dispose();

}

private void ok() {

try {

toModel( model );

} catch ( KettleException e ) {

new ErrorDialog( shell, BaseMessages.getString( PKG, "ElasticSearchBulkDialog.ErrorValidateData.DialogTitle" ),

BaseMessages.getString( PKG, "ElasticSearchBulkDialog.ErrorValidateData.DialogMessage" ), e );

}

dispose();

}

private void toModel( ElasticSearchBulkMeta in ) throws KettleException { // copy info to ElasticSearchBulkMeta

stepname = wStepname.getText();

in.setType( wType.getText() );

in.setIndex( wIndex.getText() );

in.setBatchSize( wBatchSize.getText() );

in.setTimeOut( Const.NVL( wTimeOut.getText(), null ) );

in.setTimeoutUnit( wTimeOut.getTimeUnit() );

in.setIdInField( wIdInField.getText() );

in.setOverWriteIfSameId( StringUtils.isNotBlank( wIdInField.getText() ) && wIsOverwrite.getSelection() );

in.setStopOnError( wStopOnError.getSelection() );

in.setJsonInsert( wIsJson.getSelection() );

in.setJsonField( wIsJson.getSelection() ? wJsonField.getText() : null );

in.setIdOutField( wIdOutField.getText() );

in.setUseOutput( wUseOutput.getSelection() );

in.clearFields();

if ( !wIsJson.getSelection() ) {

for ( int i = 0; i < wFields.getItemCount(); i++ ) {

String[] row = wFields.getItem( i );

if ( StringUtils.isNotBlank( row[0] ) ) {

in.addField( row[0], row[1] );

}

}

}

in.clearServers();

for ( int i = 0; i < wServers.getItemCount(); i++ ) {

String[] row = wServers.getItem( i );

if ( StringUtils.isNotBlank( row[0] ) ) {

try {

in.addServer( row[0], Integer.parseInt( row[1] ) );

} catch ( NumberFormatException nfe ) {

in.addServer( row[0], ElasticSearchBulkMeta.DEFAULT_PORT );

}

}

}

in.clearSettings();

for ( int i = 0; i < wSettings.getItemCount(); i++ ) {

String[] row = wSettings.getItem( i );

in.addSetting( row[0], row[1] );

}

}

private enum TestType {

INDEX, CLUSTER,

}

private void test( TestType testType ) {

try {

ElasticSearchBulkMeta tempMeta = new ElasticSearchBulkMeta();

toModel( tempMeta );

if ( !tempMeta.getServers().isEmpty() ) {

Settings.Builder settingsBuilder = Settings.builder();

settingsBuilder.put( Settings.Builder.EMPTY_SETTINGS );

tempMeta.getSettingsMap().entrySet().stream().forEach( ( s ) -> settingsBuilder.put( s.getKey(), transMeta

.environmentSubstitute( s.getValue() ) ) );

RestHighLevelClient rclient=null;

//        try ( PreBuiltTransportClient client = new PreBuiltTransportClient( settingsBuilder.build() ) ) {

//

//        for ( Server server : tempMeta.getServers() ) {

//

//          client.addTransportAddress( new TransportAddress(

//                  InetAddress.getByName( transMeta.environmentSubstitute( server.getAddress() ) ),

//                  server.getPort() ) );

//

//        }

for ( Server server : tempMeta.getServers() ) {

rclient = new RestHighLevelClient(RestClient.builder(new HttpHost(server.getAddress(), Integer.valueOf(server.getPort()), "http")));

}

//        AdminClient admin = rclient.admin();

String[] index = tempMeta.getIndex().split(",");

GetIndexRequest request = new GetIndexRequest();

request.indices(index);

request.local(false);

request.humanReadable(true);

boolean exists  = rclient.indices().exists(request, RequestOptions.DEFAULT);

switch ( testType ) {

case INDEX:

if ( StringUtils.isBlank( tempMeta.getIndex() ) ) {

showError( BaseMessages.getString( PKG, "ElasticSearchBulk.Error.NoIndex" ) );

break;

}

// First check to see if the index exists

//            IndicesExistsRequestBuilder indicesExistBld = admin.indices().prepareExists( tempMeta.getIndex() );

//            IndicesExistsResponse indicesExistResponse = indicesExistBld.execute().get();

if ( !exists ) {

showError( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.Error.NoIndex" ) );

return;

}

//            RecoveryRequestBuilder indicesBld = rclient.indices().prepareRecoveries( tempMeta.getIndex() );

//            ActionFuturelafInd = indicesBld.execute();

//            String shards = "" + lafInd.get().getSuccessfulShards() + "/" + lafInd.get().getTotalShards();

showMessage( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.TestIndex.TestOK", "true" ) );

break;

case CLUSTER:

//            ClusterStateRequestBuilder clusterBld = admin.cluster().prepareState();

//            ActionFuturelafClu = clusterBld.execute();

//            ClusterStateResponse cluResp = lafClu.actionGet();

//            String name = cluResp.getClusterName().value();

//            ClusterState cluState = cluResp.getState();

//            int numNodes = cluState.getNodes().getSize();

//            showMessage( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.TestCluster.TestOK", name, numNodes ) );

MainResponse response = rclient.info(RequestOptions.DEFAULT);

showMessage( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.TestCluster.TestOK", response.getClusterName(), response.getVersion() ) );

break;

default:

break;

}

}else{

showError( "Servers is null" );

}

} catch ( NoNodeAvailableException | MasterNotDiscoveredException e ) {

showError( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.Error.NoNodesFound" ) );

} catch ( Exception e ) {

showError( e.getLocalizedMessage() );

}

}

private void showError( String message ) {

MessageBox mb = new MessageBox( shell, SWT.OK | SWT.ICON_ERROR );

mb.setMessage( message );

mb.setText( BaseMessages.getString( PKG, "System.Dialog.Error.Title" ) );

mb.open();

}

private void showMessage( String message ) {

MessageBox mb = new MessageBox( shell, SWT.OK | SWT.ICON_INFORMATION );

mb.setMessage( message );

mb.setText( BaseMessages.getString( PKG, "ElasticSearchBulkDialog.Test.TestOKTitle" ) );

mb.open();

}

@Override

public String toString() {

return this.getClass().getName();

}

}

7、     进入pentaho-kettle-8.2.0.0-R目录,打开cmd命令窗口,执行mvn clean package -Dmaven.test.skip=true进行打包编译,过程中会出现错误,进入pentaho-kettle-8.2.0.0-R\plugins\elasticsearch-bulk-insert\assemblies\plugin\target目录查看是否存在名称为elasticsearch-bulk-insert-plugin-8.2.0.0-342.zip的包,如果存在则不用在乎错误。如果不存在,请根据提示进行修改,再执行命令直到出现elasticsearch-bulk-insert-plugin-8.2.0.0-342.zip包为止。

8、进入工具安装目录data-integration\pdi-ce-8.2.0.0-342\data-integration\plugins,删除elasticsearch-bulk-insert-plugin文件夹

9、将elasticsearch-bulk-insert-plugin-8.2.0.0-342.zip包解压,将elasticsearch-bulk-insert-plugin文件夹移动到工具安装目录data-integration\pdi-ce-8.2.0.0-342\data-integration\plugins下即可。

10、进入工具安装目录data-integration\pdi-ce-8.2.0.0-342\data-integration,执行Spoon.bat

11、到此,kettle连接Elastic插件可以使用了。

三、注意事项

1、一个索引只能有一个type

2、Settings不用配置,如下图

kettle连接不上es7_kettle8.2连接ElasticSearch7相关推荐

  1. 突然远程连接不上mysql_navicat突然连接不上远程linux服务器上的mysql

    我linux服务器上的mysql是docker安装的,突然有一天我的navicat连接不上服务器上的mysql,于是开始了下面一系列的修复 1.首先登录服务器上mysql,看是否能正常登录,我发现不能 ...

  2. 无线宝服务器连接不上,无线网络连接不上怎么办 为什么无线网络连接不上

    1.第一步:点击刷新网络列表时出现"Windows无法配置此连接"的提示,则打开金山卫士的"系统优化"功能,点击"开机加速"选项卡,点击&q ...

  3. 计算机连接不上蓝牙鼠标,蓝牙鼠标连接不上电脑怎么办_蓝牙鼠标连接不上电脑的解决办法...

    如果我们的电脑安装的是蓝牙鼠标的话,在使用过程中可能会出现的一些问题.近日就有用户反映说自己遇到了蓝牙鼠标连接不上电脑的问题,不知道怎么办,所以今天小编就来为大家分享关于蓝牙鼠标连接不上电脑的解决办法 ...

  4. 电信宽带连接不上无线可连接服务器,电信宽带wifi连不上网的解决方法

    电信宽带出错那是常有的事,出现连接不上网络一般会提示错误代码,原因可能是系统设置问题,也可能是网络运营商那边出现问题.下面是学习啦小编为大家整理的关于电信宽带wifi连不上网,一起来看看吧! 电信宽带 ...

  5. u盘和计算机无法连接不上,U盘连接不上电脑怎么办

    有些时候我们将U盘插入电脑,会发现U盘的盘符无法正常显现出来,很多网友对此非常无奈,只能通过插拔来尝试解决问题,今天学习啦小编就为大家带来了U盘连接不上电脑怎么办的解决方法,希望能够帮到大家. U盘连 ...

  6. 电信宽带连接不上无线可连接服务器,电信宽带连接为什么无线连接上宽带连接连不上...

    无线连上了,说明无线网卡和无线信号都没有问题.宽带连接不上,可能是线路的故障,有时甚至是电信的故障,你可以与电信客服联系一下,同时你也应该再用的用户名和密码再试试连接,有时也可能是自己输入的错误. 那 ...

  7. win7计算机怎么远程桌面连接不上,Win7系统连接不上远程桌面的解决方法

    原标题:Win7系统连接不上远程桌面的解决方法 有时候我们需要使用远程桌面连接来操作另外一台电脑,但最近有Win7系统用户反映,电脑开启了远程桌面连接功能后,无法连接到远程桌面,遇到这个问题该怎么办呢 ...

  8. 计算机网络路由器的配置连接不上,为什么路由器连接不上_我的电脑换了一个路由器怎么就连接不上网络呢...

    手机连不上路由器怎么办? 手机连不上wifi怎么办 我的路由器连接不上网络了应该怎么设置? 原因及解决办法: 1.路由器插口连接错误 用户第一步应该检查插口是否连接错误,WAN接口是路由器网络接入插口 ...

  9. 解决“手机能胜场使用校园网 笔记本电脑连接不上校园网或者连接上不可用”的问题

    问题描述: 笔者手机能正常连接校园网,但是笔记本连接上之后大概有几秒能用的时间,之后直接断开,要么就是直接显示无网络,重置网络.重启操作都使用了,但是均没有用 解决方案: 1.右键wifi图标,打开& ...

最新文章

  1. java ee开发技术 上海大学_学院介绍
  2. 论文解读 Combating Adversarial Misspellings with Robust Word Recognition
  3. 超暖心!美国消防员钻冰窟窿救狗狗
  4. linux ls命令 --time-style选项 日期时间格式控制
  5. 威佐夫博弈:百练OJ:1067:取石子游戏
  6. 英国电信云服务直连Salesforce
  7. 重复控件Repeater和数据列表控件DataList
  8. windows mobile 开发 web service 未能建立与网络的连接、无法连接到远程服务器
  9. JavaScript类型转换的有趣应用
  10. 520礼包 | 情感分析算法从原理到PaddlePaddle实战全解
  11. 苹果手机关于推送的查看LOG的一种解决办法
  12. linux执行telnet命令,Linux怎么使用telnet命令
  13. 量化交易17-先认识K线形态:处于底部上涨、处于顶部下跌: 弃婴、捉腰带线、脱离、藏婴吞没、反击线、十字孕线、倒锤头、长脚十字、光头光脚/缺影线、上升/下降三法 、跳空并列阴阳线、向上跳空的两只乌鸦
  14. 更适合小孩的电动牙刷,轻柔呵护口腔,Combo咸蛋超人电动牙刷上手
  15. 唤起高德app执行导航
  16. 全栈工程师为啥值40W的年薪?
  17. 公众平台服务号、订阅号、企业微信以及小程序的相关说明
  18. idea怎么设置热启动
  19. JavaScript中的设计模式
  20. Tableau中国地图

热门文章

  1. 牛顿法求根号数(Python)
  2. SeqStack(Templateclass T)实现
  3. BIT的浅谈,简单理解
  4. [***]HZOJ 柱状图
  5. AJAX跨域资源共享 CORS 详解
  6. POJ 2253 Frogger(最短路Floyd)题解
  7. LR(1)分析表-语法树-四元式
  8. Python3 JSON处理
  9. JVM原理和性能调优
  10. C++获取当前进程的进程号方法