function readOnly(count){ }
Starting November 20, the site will be set to read-only. On December 4, 2023,
forum discussions will move to the Trailblazer Community.
+ Start a Discussion
deepak_naikdeepak_naik 

Bulk Query with PK chunking enabled where is the location of the original batch when we read the BatchInfo[] from the Bulk Job Id

For bulk query jobs if the job has PK chunking enabled, will the original batch (that contains the query when the subsequent batches are created) will be created with the id as "0" in an BatchInfo List

For example if we have the following
BatchInfo[] bListInfo = bulkConnection.getBatchInfoList(job.getId()).getBatchInfo();

Will the Original Query Batch will always be in bListInfo[0] ? If not, how can this be ensured
Best Answer chosen by deepak_naik
deepak_naikdeepak_naik
The solution is found.
The batch which will be returned with the createBatchFromStream() API is the "original" batch, and we need to wait for the state of this batch to be as "BatchStateEnum.NotProcessed", to make sure that all the batches have completed processing.
The code method for doBulkQuery() is as follows
 
public void doBulkQuery(BulkConnection bulkConnection) {

    	  try {	  
    		  	job.setObject(ObjectName);
    		  	job.setOperation(OperationEnum.query);
    		  	job.setConcurrencyMode(ConcurrencyMode.Parallel);
    		  	job.setContentType(ContentType.CSV);
    	    
    		  	job = bulkConnection.createJob(job);
    		  	assert job.getId() != null;
    		  	System.out.println("Job id is " + job.getId());
    	    
    		  	ByteArrayInputStream bout = new ByteArrayInputStream(querySOQL.getBytes()); 
    		  	BatchInfo pkChunkBatchInfo = bulkConnection.createBatchFromStream(job, bout);
    	            	     	    
    		  	bListInfo = bulkConnection.getBatchInfoList(job.getId()).getBatchInfo();
    		  	   	    
    		  	int iBatchSleep =0;
    		  	while( pkChunkBatchInfo.getState() != BatchStateEnum.NotProcessed)
    		  	{
    		  		Thread.sleep(1000);
    		  		pkChunkBatchInfo = bulkConnection.getBatchInfo(job.getId(), pkChunkBatchInfo.getId());  
    		  
    		  		iBatchSleep++;   		  		
    		  		if(iBatchSleep > 600)
    		  			break;
    		  	}

    		  	bListInfo = bulkConnection.getBatchInfoList(job.getId()).getBatchInfo();
    		  	BatchInfo info = bListInfo[finalBatchCount];
    		  	numberOfBatchesForQueryExtract++;
    	    
    		  	String[] queryResults = null;

    		  	for(int i=0; i<10000; i++) 
    		  	{
    		  		
    		  		info = bulkConnection.getBatchInfo(job.getId(), info.getId());
    	          
    		  		if (info.getState() == BatchStateEnum.Completed) 
    		  		{
    		  			QueryResultList list = bulkConnection.getQueryResultList(job.getId(),info.getId());             
    		  			queryResults = list.getResult();
    		  			break;
    		  		} 
    		  		else if (info.getState() == BatchStateEnum.Failed) 
    		  		{
    		  			System.out.println("-------------- failed ----------" + info);    	            
    		  			break;
    		  		} 
					else if (info.getState() == BatchStateEnum.NotProcessed) 
					{
						// The batch won’t be processed. If the job has PK chunking enabled, this state 
						// is assigned to the original batch that contains the query when the subsequent 
						// batches are created
						//continue;
						break;
					} 
    		  		else 
    		  		{
    		  			System.out.println("-------------- waiting ----------"  + info);  	           
    		  		}
    		  		
    		  		Thread.sleep(Integer.parseInt(sleepTimeForBatchStatusLoop) * 1000);
    		  	}
    	    
    		  	if (queryResults != null) 
    		  	{
    		  		for (String resultId : queryResults) 
    		  		{
    		  			InputStream resultStream = bulkConnection.getQueryResultStream(job.getId(), info.getId(), resultId);
    		  			BufferedReader lineReader = new BufferedReader(new InputStreamReader(resultStream,"UTF8"));
    		  			String lineString = null;
    		  			lineString = lineReader.readLine();
    		  			
			        	if (lineString.equalsIgnoreCase("Records not found for this query"))
			        	{
			        		System.out.println("Batch " + info.getId() + " - Records not found for this query");
			        		continue;
			        	}
			        	
    		  			while((lineString = lineReader.readLine()) != null) 
    		  			{
    		  				System.out.println("lineString : " + lineString);
    		  				numberOfRecordsExtracted++;
    		  			}
    		  		}
    		  	}  
    	  } catch (AsyncApiException aae) {
    	  		aae.printStackTrace();
    	  } catch (UnsupportedEncodingException e) {
				e.printStackTrace();
		  } catch (IOException e) {
				e.printStackTrace();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}   
    	  
    	// Read the remaining batches
    	while (!isQueryDone())
    	{
    		getMoreData();
    	}
    	
    	// Print results
    	System.out.println("Job id is " + job.getId());
   	    System.out.println("Number of batches created with chunkSize " +  setChunkSize + " is " + numberOfBatchesForQueryExtract);
   	    System.out.println("Number of total records extracted is " + numberOfRecordsExtracted);
   	    long finishTime = System.nanoTime();
   	    long elapsedTime = (finishTime - startTime)/1000000; 
   	    int seconds = (int)(elapsedTime / 1000) % 60 ;
   	    int minutes = (int)((elapsedTime / (1000*60)) % 60);
   	    int hours = (int)((elapsedTime / (1000*60*60)) % 24);
   	    System.out.println("Time for processing is "+ hours + ":" + minutes + ":" + seconds);
    }

 

All Answers

deepak_naikdeepak_naik
In Salesforce documentation it is mentioned as
"Not Processed: The batch won’t be processed. This state is assigned when a job is aborted while the batch is queued. For bulk queries, if the job has PK chunking enabled, this state is assigned to the original batch that contains the query when the subsequent batches are created. After the original batch is changed to this state, you can monitor the subsequent batches and retrieve each batch’s results when it’s completed. Then you can safely close the job"

But how do I identify this original batch from the list of batches
deepak_naikdeepak_naik
The solution is found.
The batch which will be returned with the createBatchFromStream() API is the "original" batch, and we need to wait for the state of this batch to be as "BatchStateEnum.NotProcessed", to make sure that all the batches have completed processing.
The code method for doBulkQuery() is as follows
 
public void doBulkQuery(BulkConnection bulkConnection) {

    	  try {	  
    		  	job.setObject(ObjectName);
    		  	job.setOperation(OperationEnum.query);
    		  	job.setConcurrencyMode(ConcurrencyMode.Parallel);
    		  	job.setContentType(ContentType.CSV);
    	    
    		  	job = bulkConnection.createJob(job);
    		  	assert job.getId() != null;
    		  	System.out.println("Job id is " + job.getId());
    	    
    		  	ByteArrayInputStream bout = new ByteArrayInputStream(querySOQL.getBytes()); 
    		  	BatchInfo pkChunkBatchInfo = bulkConnection.createBatchFromStream(job, bout);
    	            	     	    
    		  	bListInfo = bulkConnection.getBatchInfoList(job.getId()).getBatchInfo();
    		  	   	    
    		  	int iBatchSleep =0;
    		  	while( pkChunkBatchInfo.getState() != BatchStateEnum.NotProcessed)
    		  	{
    		  		Thread.sleep(1000);
    		  		pkChunkBatchInfo = bulkConnection.getBatchInfo(job.getId(), pkChunkBatchInfo.getId());  
    		  
    		  		iBatchSleep++;   		  		
    		  		if(iBatchSleep > 600)
    		  			break;
    		  	}

    		  	bListInfo = bulkConnection.getBatchInfoList(job.getId()).getBatchInfo();
    		  	BatchInfo info = bListInfo[finalBatchCount];
    		  	numberOfBatchesForQueryExtract++;
    	    
    		  	String[] queryResults = null;

    		  	for(int i=0; i<10000; i++) 
    		  	{
    		  		
    		  		info = bulkConnection.getBatchInfo(job.getId(), info.getId());
    	          
    		  		if (info.getState() == BatchStateEnum.Completed) 
    		  		{
    		  			QueryResultList list = bulkConnection.getQueryResultList(job.getId(),info.getId());             
    		  			queryResults = list.getResult();
    		  			break;
    		  		} 
    		  		else if (info.getState() == BatchStateEnum.Failed) 
    		  		{
    		  			System.out.println("-------------- failed ----------" + info);    	            
    		  			break;
    		  		} 
					else if (info.getState() == BatchStateEnum.NotProcessed) 
					{
						// The batch won’t be processed. If the job has PK chunking enabled, this state 
						// is assigned to the original batch that contains the query when the subsequent 
						// batches are created
						//continue;
						break;
					} 
    		  		else 
    		  		{
    		  			System.out.println("-------------- waiting ----------"  + info);  	           
    		  		}
    		  		
    		  		Thread.sleep(Integer.parseInt(sleepTimeForBatchStatusLoop) * 1000);
    		  	}
    	    
    		  	if (queryResults != null) 
    		  	{
    		  		for (String resultId : queryResults) 
    		  		{
    		  			InputStream resultStream = bulkConnection.getQueryResultStream(job.getId(), info.getId(), resultId);
    		  			BufferedReader lineReader = new BufferedReader(new InputStreamReader(resultStream,"UTF8"));
    		  			String lineString = null;
    		  			lineString = lineReader.readLine();
    		  			
			        	if (lineString.equalsIgnoreCase("Records not found for this query"))
			        	{
			        		System.out.println("Batch " + info.getId() + " - Records not found for this query");
			        		continue;
			        	}
			        	
    		  			while((lineString = lineReader.readLine()) != null) 
    		  			{
    		  				System.out.println("lineString : " + lineString);
    		  				numberOfRecordsExtracted++;
    		  			}
    		  		}
    		  	}  
    	  } catch (AsyncApiException aae) {
    	  		aae.printStackTrace();
    	  } catch (UnsupportedEncodingException e) {
				e.printStackTrace();
		  } catch (IOException e) {
				e.printStackTrace();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}   
    	  
    	// Read the remaining batches
    	while (!isQueryDone())
    	{
    		getMoreData();
    	}
    	
    	// Print results
    	System.out.println("Job id is " + job.getId());
   	    System.out.println("Number of batches created with chunkSize " +  setChunkSize + " is " + numberOfBatchesForQueryExtract);
   	    System.out.println("Number of total records extracted is " + numberOfRecordsExtracted);
   	    long finishTime = System.nanoTime();
   	    long elapsedTime = (finishTime - startTime)/1000000; 
   	    int seconds = (int)(elapsedTime / 1000) % 60 ;
   	    int minutes = (int)((elapsedTime / (1000*60)) % 60);
   	    int hours = (int)((elapsedTime / (1000*60*60)) % 24);
   	    System.out.println("Time for processing is "+ hours + ":" + minutes + ":" + seconds);
    }

 
This was selected as the best answer