温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

MiniYARNCluster   MiniDFSCluster Kerberos

发布时间:2020-07-21 18:22:03 来源:网络 阅读:1974 作者:r7raul 栏目:大数据

/**

1. download hadoop source code  ,then yum install cmake zlib-devel,  finally ,mvn clean package -Pdist,native -DskipTests -Dtar to create native lib <br>

2. before run junit you shuold specify vm argument  -Djava.library.path=/usr/hadoop{version}/hadoop-dist/target/hadoop-{version}/lib/native <br>

3. use mvn clean install -Pnative -Dcontainer-executor.conf.dir=/{project.path}/bin   -DskipTests  to create container-executor <br>

   an example container-executor.cfg existe in {project.path}/bin <br>

   After that it will create  container-executor exe file <br>

   cp this file to {project.path}/bin  <br>

   sudo chown root:yourusername {project.path}/bin/container-executor in this example yourusername is tjj <br>

   sudo chmod 4550 {project.path}/bin/container-executor<br>

 4. If you want to run testjob in YARN(conf.set("mapreduce.framework.name", "yarn")),you should modify META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider <Br>

   to #org.apache.hadoop.mapred.LocalClientProtocolProvider<br>

      org.apache.hadoop.mapred.YarnClientProtocolProvider<br>

   in hadoop-mapreduce-client-common-{version}.jar which exist in your classpath


 */

import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY;

import static org.apache.hadoop.hdfs.DFSConfigKeys.*;

import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY;

import static org.junit.Assert.*;


import java.io.*;

import java.util.ArrayList;

import java.util.List;

import java.util.Properties;


import org.apache.commons.io.FileUtils;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.*;

import org.apache.hadoop.hdfs.HdfsConfiguration;

import org.apache.hadoop.hdfs.MiniDFSCluster;

import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferTestCase;

import org.apache.hadoop.http.HttpConfig;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.minikdc.MiniKdc;

import org.apache.hadoop.security.SecurityUtil;

import org.apache.hadoop.security.UserGroupInformation;

import org.apache.hadoop.security.ssl.KeyStoreTestUtil;

import org.apache.hadoop.yarn.conf.YarnConfiguration;

import org.apache.hadoop.yarn.server.MiniYARNCluster;

import org.junit.*;


public class TestClusterWithKerberos {



    private static File baseDir;

    private static String hdfsPrincipal;

    private static MiniKdc kdc;

    private static String keytab;

    private static String spnegoPrincipal;

    private MiniYARNCluster yarnCluster;

    private MiniDFSCluster cluster;



    @BeforeClass

    public static void initKdc() throws Exception {

        baseDir = new File(System.getProperty("test.build.dir", "target/test-dir"),

                SaslDataTransferTestCase.class.getSimpleName());

        FileUtil.fullyDelete(baseDir);

        assertTrue(baseDir.mkdirs());


        Properties kdcConf = MiniKdc.createConf();

        kdc = new MiniKdc(kdcConf, baseDir);

        kdc.start();

        UserGroupInformation ugi = UserGroupInformation.createRemoteUser("tjj");

        UserGroupInformation.setLoginUser(ugi);

        String userName = UserGroupInformation.getLoginUser().getShortUserName();

        File keytabFile = new File(baseDir, userName + ".keytab");

        keytab = keytabFile.getAbsolutePath();

        kdc.createPrincipal(keytabFile, userName + "/localhost", "HTTP/localhost");

        hdfsPrincipal = userName + "/localhost@" + kdc.getRealm();

        spnegoPrincipal = "HTTP/localhost@" + kdc.getRealm();

        System.out.println("keytab "+keytab+"hdfsPrincipal "+hdfsPrincipal);

    }


    @AfterClass

    public static void shutdownKdc() {

        if (kdc != null) {

            kdc.stop();

        }

        FileUtil.fullyDelete(baseDir);

    }






   

    private void startCluster(HdfsConfiguration conf) throws IOException {

        cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();//

        cluster.waitActive();

        yarnCluster = new MiniYARNCluster("MiniClusterStartsWithCountJobTest", // testName

                1, // number of node managers

                1, // number of local log dirs per node manager

                1); // number of hdfs dirs per node manager

        yarnCluster.init(conf);




        yarnCluster.start();

        yarnCluster.getConfig().writeXml(new FileOutputStream(new File("conf.Xml")));

    }


    @Test

    public void testWithMiniCluster() throws Exception {


        HdfsConfiguration clusterConf = createSecureConfig("authentication,integrity,privacy");

        YarnConfiguration yarnConf =  createYarnSecureConfig();

        clusterConf.addResource(yarnConf);

        startCluster(clusterConf);


        Configuration conf = new Configuration();

       conf.addResource(FileUtils.openInputStream(new File("conf.Xml")));

 

        String IN_DIR = "testing/wordcount/input";

        String OUT_DIR = "testing/wordcount/output";

        String DATA_FILE = "sample.txt";


        FileSystem fs = FileSystem.get(conf);

        Path inDir = new Path(IN_DIR);

        Path outDir = new Path(OUT_DIR);


        fs.delete(inDir, true);

        fs.delete(outDir, true);


        // create the input data files

        List<String> content = new ArrayList<String>();

        content.add("She sells seashells at the seashore, and she sells nuts in the mountain.");

        writeHDFSContent(fs, inDir, DATA_FILE, content);


        // set up the job, submit the job and wait for it complete

     

        Job job = Job.getInstance(conf);


        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(IntWritable.class);

        job.setMapperClass(BasicWordCount.TokenizerMapper.class);

        job.setReducerClass(BasicWordCount.IntSumReducer.class);

        FileInputFormat.addInputPath(job, inDir);

        FileOutputFormat.setOutputPath(job, outDir);

        job.waitForCompletion(true);

        assertTrue(job.isSuccessful());


        // now check that the output is as expected

        List<String> results = getJobResults(fs, outDir, 11);


        assertTrue(results.contains("She\t1"));

        assertTrue(results.contains("sells\t2"));



        // clean up after test case

        fs.delete(inDir, true);

        fs.delete(outDir, true);

    }

   /* @Test

    public void wordcount() throws Exception {


        HdfsConfiguration clusterConf = createSecureConfig("authentication,integrity,privacy");

        YarnConfiguration yarnConf =  createYarnSecureConfig();

        clusterConf.addResource(yarnConf);

        startCluster(clusterConf);


        Configuration conf = new Configuration();

        conf.addResource(FileUtils.openInputStream(new File("conf.Xml")));

        String IN_DIR = "testing/wordcount/input";

        String OUT_DIR = "testing/wordcount/output";

        String DATA_FILE = "sample.txt";


        FileSystem fs = FileSystem.get(conf);

        Path inDir = new Path(IN_DIR);

        Path outDir = new Path(OUT_DIR);


        fs.delete(inDir, true);

        fs.delete(outDir, true);


        // create the input data files

        List<String> content = new ArrayList<String>();

        content.add("She sells seashells at the seashore, and she sells nuts in the mountain.");

        writeHDFSContent(fs, inDir, DATA_FILE, content);

        String[] args = new String[]{IN_DIR,OUT_DIR};

        int exitCode = ToolRunner.run(conf,new WordCount(), args);

        fs.delete(inDir, true);

        fs.delete(outDir, true);

    }*/

    private void writeHDFSContent(FileSystem fs, Path dir, String fileName, List<String> content) throws IOException {

        Path newFilePath = new Path(dir, fileName);

        FSDataOutputStream out = fs.create(newFilePath);

        for (String line : content) {

            out.writeBytes(line);

        }

        out.close();

    }


    protected List<String> getJobResults(FileSystem fs, Path outDir, int numLines) throws Exception {

        List<String> results = new ArrayList<String>();

        FileStatus[] fileStatus = fs.listStatus(outDir);

        for (FileStatus file : fileStatus) {

            String name = file.getPath().getName();

            if (name.contains("part-r-00000")) {

                Path filePath = new Path(outDir + "/" + name);

                BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(filePath)));

                for (int i = 0; i < numLines; i++) {

                    String line = reader.readLine();

                    if (line == null) {

                        fail("Results are not what was expected");

                    }

                    System.out.println("line info: "+line);

                    results.add(line);

                }

                assertNull(reader.readLine());

                reader.close();

            }

        }

        return results;

    }


    private HdfsConfiguration createSecureConfig(String dataTransferProtection) throws Exception {

        HdfsConfiguration conf = new HdfsConfiguration();

        SecurityUtil.setAuthenticationMethod(UserGroupInformation.AuthenticationMethod.KERBEROS, conf);

        conf.set(DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY, hdfsPrincipal);

        conf.set(DFS_NAMENODE_KEYTAB_FILE_KEY, keytab);

        conf.set(DFS_DATANODE_KERBEROS_PRINCIPAL_KEY, hdfsPrincipal);

        conf.set(DFS_DATANODE_KEYTAB_FILE_KEY, keytab);

        conf.set(DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY, spnegoPrincipal);

        conf.setBoolean(DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);

        conf.set(DFS_DATA_TRANSFER_PROTECTION_KEY, dataTransferProtection);

        conf.set(DFS_HTTP_POLICY_KEY, HttpConfig.Policy.HTTPS_ONLY.name());

        conf.set(DFS_NAMENODE_HTTPS_ADDRESS_KEY, "localhost:0");

        conf.set(DFS_DATANODE_HTTPS_ADDRESS_KEY, "localhost:0");

        conf.setInt(IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY, 10);

        conf.set(DFS_ENCRYPT_DATA_TRANSFER_KEY, "true");//https://issues.apache.org/jira/browse/HDFS-7431

        String keystoresDir = baseDir.getAbsolutePath();

        String sslConfDir = KeyStoreTestUtil.getClasspathDir(this.getClass());

        KeyStoreTestUtil.setupSSLConfig(keystoresDir, sslConfDir, conf, false);

        return conf;

    }

    private YarnConfiguration createYarnSecureConfig(){


        YarnConfiguration conf = new YarnConfiguration();



        //yarn secure config

        conf.set("yarn.resourcemanager.keytab", keytab);

        conf.set("yarn.resourcemanager.principal", hdfsPrincipal);


        conf.set("yarn.nodemanager.keytab", keytab);

        conf.set("yarn.nodemanager.principal", hdfsPrincipal);

        //   conf.set("yarn.nodemanager.container-executor.class", "org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor");

        conf.set("yarn.nodemanager.container-executor.class", "org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor");

       

        conf.set("yarn.nodemanager.linux-container-executor.path", "/container/container-executor");

        conf.set("mapreduce.jobhistory.keytab", keytab);

        conf.set("mapreduce.jobhistory.principal", hdfsPrincipal);

        conf.set("yarn.nodemanager.aux-services", "mapreduce_shuffle");//https://issues.apache.org/jira/browse/YARN-1289


        //enable security


        conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true");

        //yarn

       conf.set("mapreduce.framework.name", "yarn");  //http://stackoverflow.com/questions/26567223/java-io-ioexception-cannot-initialize-cluster-in-hadoop2-with-yarn   use Yarn runner

        return conf;

    }

}



down vote

I have run into similar issues today. In my case I was building an über jar, where some dependency (I have not found the culprit yet) was bringing in a META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider with the contents:


org.apache.hadoop.mapred.LocalClientProtocolProvider

I provided my own in the project (e.g. put it on the classpath) with the following:


org.apache.hadoop.mapred.YarnClientProtocolProvider

and the correct one is picked up. I suspect you are seeing similar. To fix, please create the file described above, and put it on the classpath. If I find the culprit Jar, I will update the answer.



http://stackoverflow.com/questions/26567223/java-io-ioexception-cannot-initialize-cluster-in-hadoop2-with-yarn


hadoop-mapreduce-client-common-2.6.0.jar

#

#   Licensed under the Apache License, Version 2.0 (the "License");

#   you may not use this file except in compliance with the License.

#   You may obtain a copy of the License at

#

#       http://www.apache.org/licenses/LICENSE-2.0

#

#   Unless required by applicable law or agreed to in writing, software

#   distributed under the License is distributed on an "AS IS" BASIS,

#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

#   See the License for the specific language governing permissions and

#   limitations under the License.

#

#org.apache.hadoop.mapred.LocalClientProtocolProvider

org.apache.hadoop.mapred.YarnClientProtocolProvider


向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI