/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.mahout.cf.taste.hadoop.svd;
import java.util.Map;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.ToolRunner;
import org.apache.mahout.cf.taste.hadoop.EntityPrefWritable;
import org.apache.mahout.cf.taste.hadoop.RecommendedItemsWritable;
import org.apache.mahout.cf.taste.hadoop.ToItemPrefsMapper;
import org.apache.mahout.cf.taste.hadoop.item.ToUserVectorReducer;
import org.apache.mahout.common.AbstractJob;
import org.apache.mahout.math.VarLongWritable;
import org.apache.mahout.math.VectorWritable;
import org.apache.mahout.math.hadoop.decomposer.DistributedLanczosSolver;
import org.apache.mahout.math.hadoop.decomposer.EigenVerificationJob;
/**
* <p>DistributedSVDRecommenderJob implements a basic distributed SVD-based recommendation algorithm as a
* number of mapreduces.</p>
*
* <p>Command line arguments taken by this class are:</p>
* <ol>
* <li>-Dmapred.input.dir=(path) or --input: Path to directory containing a text file that contains user/item prefs
* in the format userID,itemID,pref one set per line</li>
* <li>-Dmapred.output.dir=(path) or --output: Path to directory where overall class output should go</li>
* <li>--numRecommendations : Number of recommendations to be made per user</li>
* <p>Arguments that are passed solely to {@link DistributedLanczosSolver}</p>
* <li>--numRows : Number of Rows/Users of input Matrix</li>
* <li>--numCols : Number of Columns/Items of input Matrix</li>
* <li>--symmetric : Is input matrix symmetric</li>
* <li>--rank : ranks of desired decomposition</li>
* </ol>
*
* <p>General command line options are documented in {@link AbstractJob}.</p>
*
*
* <p>Walkthrough of Algorithm</p>
*
* <p>Initial Input : Text file containing user/item prefs one set per line in the format
* userID,itemID,pref </p>
*
* <p>This initial input is then converted through a M/R into a {@link SequenceFile} containing
* {@link VarLongWritable}/{@link VectorWritable} pairs representing a User*Item Matrix(ie input
* Matrix A).</p>
*
* <p>From here {@link DistributedLanczosSolver} is passed the SequenceFile to solve the Matrix SVD in
* a series of M/R.</p>
*
* <p>The output from the {@link DistributedLanczosSolver} is then given to {@link EigenVerificationJob}
* which checks and cleans it.</p>
*
* <p>The cleaned SVD Matrix V is then used by {@link SVDPredictionjob} along with the initial input
* Matrix A to calculate through a number of M/R the prediction matrix from which the user recommendations
* can be made.</p>
*
* <p></p>
*/
public class DistributedSVDRecommenderJob extends AbstractJob {
@Override
public int run(String[] args) throws Exception {
//add argument options
addInputOption();
addOutputOption();
addOption("numRecommendations", "n", "Number of recommendations per user", "10");
addOption("numRows","nr","Number of rows of matrix");
addOption("numCols","nc","Number of columns of matrix");
addOption("symmetric","s","Is the matrix symmetric", "false");
addOption("rank","r","Rank of the decomposition", "10");
//deal with arguments
Map<String,String> parsedArgs = parseArguments(args);
if (parsedArgs == null) {
return -1;
}
Path inputPath = getInputPath();
Path outputPath = getOutputPath();
Path tempDirPath = new Path(parsedArgs.get("--tempDir"));
String numRows = parsedArgs.get("--numRows");
String numCols = parsedArgs.get("--numCols");
String isSymmetric = parsedArgs.get("--symmetric");
String rank = parsedArgs.get("--rank");
String recommendationsPerUser = parsedArgs.get("--numRecommendations");
//Set up temp directories for the individual jobs
String userVectorPath = tempDirPath + "/userVector";
String cardinalityCorrectionPath = tempDirPath + "/cardinalityCorrection";
String distributedLanczozSolverPath = tempDirPath + "/distributedLanczozSolver";
String eigenVerificationPath = tempDirPath + "/eigenVerification";
String svdPredictionPath = tempDirPath + "/svdPrediction/productWith";
//construct args for DistributedLanczosSolver
String[] argsDLS = {
"-Dmapred.input.dir=" + cardinalityCorrectionPath,
"-Dmapred.output.dir=" + distributedLanczozSolverPath + "/part-0000",
"--tempDir",tempDirPath + "/tempDLS",
"--numRows",numRows,
"--numCols",numCols,
"--symmetric",isSymmetric,
"--rank", rank};
//construct args for EigenVerificationJob
String[] argsEVJ = {
"--eigenInput", distributedLanczozSolverPath,
"--output",eigenVerificationPath
,"--corpusInput",cardinalityCorrectionPath
,"--tempDir",tempDirPath + "/tempEigenVerification"};
//construct args for PredictionJob
/*String[] argsSVDP = {
"--matrixVInput",eigenVerificationPath,
"--matrixAInput",cardinalityCorrectionPath,
"--tempDir",tempDirPath + "/tempSVDP",
"--output",svdPredictionPath + "/part-r-00000"};
*/
String[] argsSVDP = {
"--matrixVInput",eigenVerificationPath,
"--matrixAInput",cardinalityCorrectionPath,
"--numRowsA",numRows,
"--numColsA",numCols,
"--numRowsV",rank,
"--numColsV",numCols,
"--tempDir",tempDirPath + "/tempSVDP",
"--output",svdPredictionPath};
//Job to convert input into vectors
Job toUserVectorJob = prepareJob(inputPath, new Path(userVectorPath), TextInputFormat.class,
ToItemPrefsMapper.class, VarLongWritable.class, EntityPrefWritable.class,
ToUserVectorReducer.class, VarLongWritable.class, VectorWritable.class,
SequenceFileOutputFormat.class);
toUserVectorJob.waitForCompletion(true);
//Job that makes recommendations in single mapper, using default reducer as identity reducer
Job cardinalityCorrectionJob = prepareJob(new Path(userVectorPath), new Path(cardinalityCorrectionPath),
SequenceFileInputFormat.class,
CardinalityCorrectionMapper.class, IntWritable.class, VectorWritable.class,
Reducer.class, IntWritable.class, VectorWritable.class,
SequenceFileOutputFormat.class);
cardinalityCorrectionJob.getConfiguration().setInt(CardinalityCorrectionMapper.NUM_COLS, Integer.parseInt(numCols));
cardinalityCorrectionJob.waitForCompletion(true);
//Take output of previous job and use DistributedLanczosSolver to do the SVD calculations
ToolRunner.run(new DistributedLanczosSolver().job(), argsDLS);
//Take the output of the DistributedLanczosSolver and use the EigenVerificationJob to verify it
ToolRunner.run(new EigenVerificationJob(), argsEVJ);
//Use output of EigenVerificationJob to calculate the predicted pref for each user/item pair
ToolRunner.run(new SVDPredictionJob(), args
- 1
- 2
- 3
- 4
前往页