Starting from:

$25

CSCI4061 - Introduction to Operating Systems - Project #2 - IPC-based Map Reduce - Solved

1      Purpose
In project 1, we built a simple version of mapreduce using operating system primitives such as fork, exec and wait. While doing so, several utility functions were provided which helped you implement the map and reduce tasks. In this project, you will be required to implement these utility functions using inter process communiation (IPC) based system calls such as msgget, msgsend, msgrecv, msgctl etc. You should work in groups[1] of 3 as in Project 1. Please adhere to the output formats provided in each section.

2       Problem Statement
In this project, we will revisit the single machine map-reduce designed for the word count application[2] in

Project 1. There are four phases: Master, Map, Shuffle and Reduce. In Master phase, the input text file is taken as input from the command line. The master will split the input file in chunks of size 1024 bytes and distribute it uniformly with all the mapper processes. In the Map phase, each mapper will tokenize the text chunk received from the master and writes the <word 1 1 1...> information to word.txt files. Once the mappers complete, the master will call the Shuffle phase to partition the word.txt files for the reducers. The files are partitioned across different reducers based on a hash function. Partitioning essentially allocates specific non-overlapping key ranges (i.e. words in our case) to specific reducers to share the load. Once the partitioning is complete, the word.txt file paths are shared with the Reduce phase. Then the main program will spawn the reducer processes to carry out the final word count in the Reduce phase.

 Objective: You will have to design and implement portions of Master, Map, Shuffle and Reduce phases. A code template will be provided to you. You are also free to use portions of your implemented code from Project 1. You can also just start from your Project 1 solution.

3        Functions to implement
In this section, we will discuss the details of the functions which you are supposed to implement. Please refer to Project 1 for detailed description of each of the four phases in MapReduce. We use END and ACK (acknowledge) messages to mark the end of any phase so that the involved processes can move on to their next phases.

3.1      sendChunkData
The Master phase uses the sendChunkData function to distribute chunks of the input file to the mappers in a round robin fashion. Refer to Algorithm 1 for details.

File: src/utils.c
 

Algorithm 1: sendChunkData()

 

Input: (String inputFile,Integer nMappers), inputFile: text file to be sent, nMappers: number of mappers // open message queue messageQueue ← openMessageQueue();

// Construct chunks of at most 1024 bytes each and send each chunk to a mapper in a round robin fashion.

 while inputFile has remaining text do chunk ← getNextChunk(inputFile); messageSend(messageQueue,chunk,mapperID);

end
//send END message to mappers for each mapperId do           messageSend(messageQueue,EndMessage,mapperId);

end
// wait for ACK from the mappers for END notification for each mapper do           wait(messageQueue);

end
// close the message queue close(messageQueue);

!

Notice: The code for bookeepingCode(), spawnMappers(), spawnReducers(), waitForAll() used by the master are already provided in code template.

!

 Tip: While constructing the 1024 bytes chunk, if the 1024th byte is somwhere in middle of a word, constructing the 1024 byte chunk will result in that word being split across multiple chunks. Therefore, just construct the chunk upto the previous word so that no word gets split.

!

To-do: You are supposed to implement the sendChunkData() function.

3.2      getChunkData
Each mapper in the Map phase calls the getChunkData function to receive the text chunks from the master process. Refer to Algorithm 2 for details.

File: src/utils.c
 

Algorithm 2: getChunkData()

 

Input: (Integer mapperID), mapperID: mapper’s id assigned by master ∈ {1, 2, ..., nMappers} Result: chunkdata, chunk data received from master

// open message queue messageQueue ← openMessageQueue(); // receive chunk from the master chunkData ← messageReceive(messageQueue,mapperID); // check for END message and send ACK to master if chunkData == EndMessage then  messageSend(messageQueue,ACK,master);

3.3     shuffle
Once all the mapper processes complete and terminate, the master process will call the shuffle(). The shuffle function will divide the word.txt files in output/MapOut/Map_mapperID folders across nReducers and send the file paths to each reducer based on a hash function.

The flow of control in shuffle is given in algorithm 3.

File: src/utils.c
 

Algorithm 3: shuffle()
 

Input: (Integer nMappers,Integer nReducers), nMappers: #mappers, nReducers: #reducers

// open message queue messageQueue ← openMessageQueue();

 // traverse the directory of each Mapper and send the word filepath to the reducers for each mapper do for each wordFileName in mapOutDir do // select the reducer using a hash function reducerId = hashFunction(wordFileName,nReducers)∗;

// send word filepath to reducer

messageSend(messageQueue,wordFilePath,reducerId);

end end
//send END message to reducers for each reducerId do           messageSend(messageQueue,EndMessage,reducerId);

end
// wait for ACK from the reducers for END notification for each reducer do           wait(messageQueue);

end
// close the message queue close(messageQueue);

 

 Notice: The code for hashFunction() function is already provided in code template.

To-do: You are supposed to implement the rest of the shuffle() function.

3.4      getInterData
Each reducer uses the getInterData function to retrieve the file path for words for which it has to perform the reduce operation and compute the total count. Refer to Algorithm 4 for details.

File: src/utils.c
 

Algorithm 4: getInterData()

 

Input: (String wordFileName,Integer reducerID), wordFileName: placeholder for storing the word file path received from master, reducerID: reducer’s id assigned by master ∈ {1, 2, ..., nReducers}

Result: wordFileName has the word file path received from master

// open message queue messageQueue ← openMessageQueue(); // receive data from the master wordFileName ← messageReceive(messageQueue,reducerID);

// check for END message and send ACK to master if chunkData == EndMessage then messageSend(messageQueue,ACK,master);

end
!

Notice: * The code for reduce(), writeFinalDS() functions used by reducer are already provided in code template.

!

To-do: You are supposed to implement the getInterData function.

!

Note:

•    The master process sends an END message to each mapper to inform it of the completion of transfer of chunks (in sendChunkData() function). Each mapper, in turn, sends an ACK message to the master for acknowledging the receipt of all chunks (in getChunkData() function). Once the master and mapper processes have exchanged END and ACK messages, they move on to next phase.

•     The master process also sends an END message to each reducer to inform it of the completion of sending of intermediate word file paths (in shuffle() function). Each reducer, in turn, sends an ACK message to the master for acknowledging the receipt of all file paths (in getInterData() function). Once the master and reducer processes have exchanged END and ACK messages, they move on to next phase.

4          Compile and Execute

Please refer to Project 1.

5          Expected Output

Please refer to Project 1.

6          Testing

Please refer to Project 1.


More products