实现hive proxy5-数据目录权限问题解决

简介:

hive创建目录时相关的几个hdfs中的类:

1
2
3
org.apache.hadoop.hdfs.DistributedFileSystem,FileSystem 的具体实现类
org.apache.hadoop.hdfs.DFSClient,client操作hdfs文件系统的类
org.apache.hadoop.fs.permission.FsPermission 文件权限相关类,主要的方法有getUMask和applyUMask方法

org.apache.hadoop.hdfs.DistributedFileSystem中需要注意的几个方法:
initialize,主要用来初始DFSClient的实例:

1
2
3
4
5
6
7
8
9
10
11
12
   @Override
   public  void  initialize(URI uri, Configuration conf)  throws  IOException {
     super .initialize(uri, conf);
     setConf(conf);
     String host = uri.getHost();
     if  (host ==  null ) {
       throw  new  IOException( "Incomplete HDFS URI, no host: " + uri);
     }
     this .dfs =  new  DFSClient(uri, conf, statistics);
     this .uri = URI.create(uri.getScheme()+ "://" +uri.getAuthority());
     this .workingDir = getHomeDirectory();
   }

mkdir用来创建一个目录,mkdirs用来创建多个目录(类似于mkdir -p):

1
2
3
4
5
6
7
8
   public  boolean  mkdir(Path f, FsPermission permission)  throws  IOException {
     statistics.incrementWriteOps( 1 );
     return  dfs.mkdirs(getPathName(f), permission,  false );
   }
   public  boolean  mkdirs(Path f, FsPermission permission)  throws  IOException {
     statistics.incrementWriteOps( 1 );
     return  dfs.mkdirs(getPathName(f), permission,  true );
   }

两者最终调用的都是DFSClient.mkdirs方法,org.apache.hadoop.hdfs.DFSClient的mkdirs方法:

1
2
3
4
5
6
7
8
9
10
final  Conf dfsClientConf;
...
   public  boolean  mkdirs(String src, FsPermission permission,
       boolean  createParent)  throws  IOException {
     if  (permission ==  null ) {  //如果传入的权限为null
       permission = FsPermission.getDefault();
     }
     FsPermission masked = permission.applyUMask(dfsClientConf.uMask);
     return  primitiveMkdir(src, masked, createParent);  //调用primitiveMkdir方法    
   }

这里需要注意 FsPermission.getDefault方法和Conf.uMask属性(Conf是DFSClient的内部类,主要用来设置默认配置)
Conf.uMask属性:

1
uMask = FsPermission.getUMask(conf);  //由getUMask获取

getUMask方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
  public  static  final  String DEPRECATED_UMASK_LABEL =  "dfs.umask" ;
   public  static  final  String UMASK_LABEL =
                   CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY;   //fs.permissions.umask-mode
   public  static  final  int  DEFAULT_UMASK =
                   CommonConfigurationKeys.FS_PERMISSIONS_UMASK_DEFAULT;  //0022
                      
   public  static  FsPermission getUMask(Configuration conf) {
     int  umask = DEFAULT_UMASK;
     if (conf !=  null ) {
       String confUmask = conf.get(UMASK_LABEL);
       int  oldUmask = conf.getInt(DEPRECATED_UMASK_LABEL, Integer.MIN_VALUE);  //老的配置项:dfs.umask,默认值为Integer.MIN_VALUE(-2147483648)
       try  {
         if (confUmask !=  null ) {  //如果设置了fs.permissions.umask-mode,则按这个umask,否则为默认的umask(0022)
           umask =  new  UmaskParser(confUmask).getUMask();
         }
       catch (IllegalArgumentException iae) {
         // Provide more explanation for user-facing message
         String type = iae  instanceof  NumberFormatException ?  "decimal"
             "octal or symbolic" ;
         String error =  "Unable to parse configuration "  + UMASK_LABEL
             " with value "  + confUmask +  " as "  + type +  " umask." ;
         LOG.warn(error);
        
         // If oldUmask is not set, then throw the exception
         if  (oldUmask == Integer.MIN_VALUE) {
           throw  new  IllegalArgumentException(error);
         }
       }
        
       if (oldUmask != Integer.MIN_VALUE) {  //如果手动设置了老的配置项dfs.umask
         if  (umask != oldUmask) {  //并且dfs.umask的值不等于0022
           LOG.warn(DEPRECATED_UMASK_LABEL
               " configuration key is deprecated. "  "Convert to "
               + UMASK_LABEL +  ", using octal or symbolic umask "
               "specifications." );
           // Old and new umask values do not match - Use old umask
           umask = oldUmask;  //umask为默认值0022
         }
       }
     }
    
     return  new  FsPermission(( short )umask);
   }

在hive中创建hdfs的目录有两种方法
1)通过Utilities的createDirsWithPermission方法,这种方法会重设fs.permissions.umask-mode
2)直接通过DistributedFileSystem的mkdirs方法创建
两者最终都是调用了DFSClient的mkdirs方法,不同的是调用Utilities.createDirsWithPermission创建的目录权限在proxy时权限有可能是777(因为手动设置了权限为777),
比如:
Context类的构造函数中创建临时文件目录通过Context.getMRScratchDir调getLocalScratchDir(local job)或getScratchDir(非local job),其中getScratchDir中调用Utilities.createDirsWithPermission方法调用目录

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public  static  boolean  createDirsWithPermission(Configuration conf, Path mkdirPath,
     FsPermission fsPermission,  boolean  recursive)  throws  IOException {
   String origUmask =  null ;
   LOG.warn( "Create dirs "  + mkdirPath +  " with permission "  + fsPermission +  " recursive "  +
       recursive);
   if  (recursive) {
   //如果recursive为true,设置fs.permissions.umask-mode为000,
   //默认情况下recursive = SessionState.get().isHiveServerQuery() &&conf.getBoolean(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS.varname,HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS.defaultBoolVal);
   //即时来自hiveserver的请求,并且开启了doas,这里还会把权限设置为777(这里我增加了一个逻辑,如果设置了proxy,recursive也为true)
   /**
   boolean recursive = false;
   if (SessionState.get() != null) {
     recursive = (SessionState.get().isHiveServerQuery() &&
         conf.getBoolean(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS.varname,
             HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS.defaultBoolVal))||(HiveConf.getBoolVar(conf,HiveConf.ConfVars.HIVE_USE_CUSTOM_PROXY));
     fsPermission = new FsPermission((short)00777);
   }
   */
     origUmask = conf.get( "fs.permissions.umask-mode" );
     conf.set( "fs.permissions.umask-mode" "000" );
   }
   FileSystem fs = ShimLoader.getHadoopShims().getNonCachedFileSystem(mkdirPath.toUri(), conf);
   //这里是DFSClient的实例
   boolean  retval =  false ;
   try  {
     retval = fs.mkdirs(mkdirPath, fsPermission);
     resetConfAndCloseFS(conf, recursive, origUmask, fs);
   catch  (IOException ioe) {
     try  {
       resetConfAndCloseFS(conf, recursive, origUmask, fs);  //调用resetConfAndCloseFS,reset fs.permissions.umask-mode的设置
     }
     catch  (IOException e) {
       // do nothing - double failure
     }
   }
   return  retval;
}

resetConfAndCloseFS方法用来重设fs.permissions.umask-mode的设置,这样如果后面创建目录不是使用Utilities.createDirsWithPermission就会使用这个重设的配置

1
2
3
4
5
6
7
8
9
10
11
private  static  void  resetConfAndCloseFS (Configuration conf,  boolean  unsetUmask,
     String origUmask, FileSystem fs)  throws  IOException {
   if  (unsetUmask) {  //unsetUmask为true,即recursive为true的话,需要重设fs.permissions.umask-mode
     if  (origUmask !=  null ) {  //如果有设置项的话,使用设置项
       conf.set( "fs.permissions.umask-mode" , origUmask);
     else  {
       conf.unset( "fs.permissions.umask-mode" );  //这里虽然可以unset,后面会有默认值
     }
   }
   fs.close();
}

通过查看DFSClient的源码,发现在DFSClient的构造函数中会初始化ugi的信息,默认为当前用户

1
2
3
4
5
6
7
8
9
10
11
final  UserGroupInformation ugi;
...
this .ugi = UserGroupInformation.getCurrentUser(); 
如果更改成proxy用户,通过运行hadoop fs -mkdir测试,发现生成的文件目录属主还是当前登录用户
更改DFSClient的构造方法:
//this.ugi = UserGroupInformation.getCurrentUser();
if (conf.getBoolean( "use.custom.proxy" , false )){
   this .ugi = UserGroupInformation.createRemoteUser(conf.get( "custom.proxy.user" ));
} else {
   this .ugi = UserGroupInformation.getCurrentUser();
}

在hdfs-site.xml配置中增加:
dfs配置中增加:

1
2
3
4
5
6
7
8
<property>
     <name>use.custom.proxy</name>        
     <value> true </value>
</property>
<property>
     <name>custom.proxy.user</name>       
     <value>ericni</value>
</property>

使用hdfs创建目录后,目录的属主仍然是hdfs,而数据写入的用户为提交job的用户。
因为上面的原因,要想使创建的hdfs的目录属主为proxy的用户,可以采用创建完后设置owner的方法。
通过查看DistributedFileSystem类的api,发现有setOwner的方法。
以insert overwrite 语句为例,在mapred job提交之前,会根据job的上下文内容,创建map和reduce的临时目录,这个目录是最终数据落地的目录,落地之后,在job完成的finally阶段,会通过MoveTask移动到对应的目录下面临时数据写入目录在ExecDriver类的execute方法中生成:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public  int  execute(DriverContext driverContext) {
   IOPrepareCache ioPrepareCache = IOPrepareCache.get();
   ioPrepareCache.clear();
   boolean  success =  true ;
   Context ctx = driverContext.getCtx();
   boolean  ctxCreated =  false ;
   Path emptyScratchDir;
   MapWork mWork = work.getMapWork();
   ReduceWork rWork = work.getReduceWork();
   try  {
     if  (ctx ==  null ) {
       ctx =  new  Context(job);
       ctxCreated =  true ;
     }
     emptyScratchDir = ctx.getMRTmpPath();
     FileSystem fs = emptyScratchDir.getFileSystem(job);
     fs.mkdirs(emptyScratchDir);
   catch  (IOException e) {
     e.printStackTrace();
     console.printError( "Error launching map-reduce job" "\n"
         + org.apache.hadoop.util.StringUtils.stringifyException(e));
     return  5 ;
   }
....
   List<Path> inputPaths = Utilities.getInputPaths(job, mWork, emptyScratchDir, ctx);   //获取输入目录
   Utilities.setInputPaths(job, inputPaths);
   Utilities.setMapRedWork(job, work, ctx.getMRTmpPath());
....
   Utilities.createTmpDirs(job, mWork);  //创建map临时目录
   Utilities.createTmpDirs(job, rWork);  //创建reduce临时目录
1
2
3
4
5
6
7
8
9
10
11
一种思路,在外层创建目录后setOwner,可以在Utilities中增加一个方法调用setOwner:
public  static  void  setDirWithOwner(Configuration conf,Path mkdirPath,
         String username,String groupname)  throws   IOException {
    LOG.warn( "in Utilities setDirWithOwner path: "  + mkdirPath +  ",username: "  + username +  ",groupname: "  + groupname);
    FileSystem fs = ShimLoader.getHadoopShims().getNonCachedFileSystem(mkdirPath.toUri(), conf);
    try  {
       fs.setOwner(mkdirPath, username, groupname);  //调用DistributedFileSystem.setOwner方法
    } catch  (IOException ios) {
      //no-op
   
}

同时更改createTmpDirs方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
   private  static  void  createTmpDirs(Configuration conf,
       List<Operator<?  extends  OperatorDesc>> ops)  throws  IOException {
     FsPermission fsPermission =  new  FsPermission(( short ) 00777 );
     while  (!ops.isEmpty()) {
       Operator<?  extends  OperatorDesc> op = ops.remove( 0 );
       if  (op  instanceof  FileSinkOperator) {
         FileSinkDesc fdesc = ((FileSinkOperator) op).getConf();  //org.apache.hadoop.hive.ql.plan.FileSinkDesc
         Path tempDir = fdesc.getDirName();  //获取目录名
         if  (tempDir !=  null ) {
           Path tempPath = Utilities.toTempPath(tempDir);   //目录增加_tmp.前缀
           createDirsWithPermission(conf, tempPath, fsPermission);
             if  (conf.getBoolean( "use.custom.proxy" , false )) {  //如果设置了use.custom.proxy,则调用setDirWithOwner方法,设置目录权限
           LOG.warn( "set owner after create dirs" );
           String username = conf.get( "custom.proxy.user" );
           setDirWithOwner(conf,tempPath,username, null );
         }
         }
       }
       if  (op.getChildOperators() !=  null ) {
         ops.addAll(op.getChildOperators());
       }
     }
   }

上面这种方法有一定的局限性,比如是使用了Utilities.createTmpDirs的方法创建的目录才有用(比如map或者reduce的临时数据目录)。
可以通过改下层的实现:
在DFSClient中增加一个setOwner方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public  boolean  setOwner(String src, String username)  throws  IOException {
     boolean  setResult =  false ;
     checkOpen();
     try  {
         namenode.setOwner(src, username,  null );
         setResult =  true ;
     catch (RemoteException re) {
         throw  re.unwrapRemoteException(AccessControlException. class ,
                                        FileNotFoundException. class ,
                                        SafeModeException. class ,
                                        UnresolvedPathException. class );
    } finally {
         return  setResult;
    }
}

同时更改primitiveMkdir为如下内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public  boolean  primitiveMkdir(String src, FsPermission absPermission,
   boolean  createParent)
   throws  IOException {
   checkOpen();
   boolean  MkRe;
   boolean  SetRe;
   if  (absPermission ==  null ) {
     absPermission =
       FsPermission.getDefault().applyUMask(dfsClientConf.uMask);
   }
   if (LOG.isDebugEnabled()) {
     LOG.debug(src +  ": masked="  + absPermission);
   }
   try  {
       MkRe = namenode.mkdirs(src, absPermission, createParent);  //namenode:org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB
       if  ( this .conf.getBoolean( "use.custom.proxy" , false )){
           LOG.warn( "change primitiveMkdir add conf: "  this .conf.getBoolean( "use.custom.proxy" , false ));
           LOG.warn( "change primitiveMkdir add conf: "  this .conf.get( "custom.proxy.user" ));
           String username =  this .conf.get( "custom.proxy.user" );
             if  (( "" ).equals(username)||username ==  null ||( "hdfs" ).equals(username)){
                 //no-op
                  SetRe =  true ;
             } else {
                  SetRe = setOwner(src,username);
             }      
       } else  {
           SetRe =  true ;
       }
       return  MkRe&&SetRe;
   catch (RemoteException re) {
     throw  re.unwrapRemoteException(AccessControlException. class ,
                                    InvalidPathException. class ,
                                    FileAlreadyExistsException. class ,
                                    FileNotFoundException. class ,
                                    ParentNotDirectoryException. class ,
                                    SafeModeException. class ,
                                    NSQuotaExceededException. class ,
                                    DSQuotaExceededException. class ,
                                    UnresolvedPathException. class );
   }
}

这样,只要是调用了DFSClient的primitiveMkdir方法创建的目录(正常情况下创建目录都会调用primitiveMkdir方法),在proxy的情况下都可以更改目录。


到这里,hive的proxy算是开发完成了,为了实现proxy的功能,对hive和hadoop的代码更改如下:

1.HiveConf中增加两个配置项
2.重写HadoopDefaultAuthenticator的setConf方法
3.更改Context构造方法中关于scratch目录的项
4.更改Utilities中的createDirsWithPermission方法和createTmpDirs方法,并新增setDirWithOwner方法
5.更改HiveHistoryImpl构造方法中关于日志路径的项
6.更改JobClient的init方法
7.更改DFSClient的构造方法,增加一个setOwner方法,同时更改primitiveMkdir方法



本文转自菜菜光 51CTO博客,原文链接:http://blog.51cto.com/caiguangguang/1591572,如需转载请自行联系原作者

相关文章
|
4月前
|
SQL 数据采集 数据挖掘
大数据行业应用之Hive数据分析航班线路相关的各项指标
大数据行业应用之Hive数据分析航班线路相关的各项指标
99 1
|
4月前
|
SQL 存储 大数据
【大数据技术Hadoop+Spark】Hive基础SQL语法DDL、DML、DQL讲解及演示(附SQL语句)
【大数据技术Hadoop+Spark】Hive基础SQL语法DDL、DML、DQL讲解及演示(附SQL语句)
77 0
|
6月前
|
SQL 分布式计算 大数据
黑马程序员-大数据入门到实战-分布式SQL计算 Hive 入门
黑马程序员-大数据入门到实战-分布式SQL计算 Hive 入门
71 0
|
6月前
|
SQL Java 大数据
Hive实战(03)-深入了解Hive JDBC:在大数据世界中实现数据交互
Hive实战(03)-深入了解Hive JDBC:在大数据世界中实现数据交互
219 1
|
4月前
|
SQL 分布式计算 数据库
【大数据技术Spark】Spark SQL操作Dataframe、读写MySQL、Hive数据库实战(附源码)
【大数据技术Spark】Spark SQL操作Dataframe、读写MySQL、Hive数据库实战(附源码)
97 0
|
4月前
|
SQL 存储 分布式计算
【大数据技术Hadoop+Spark】Hive数据仓库架构、优缺点、数据模型介绍(图文解释 超详细)
【大数据技术Hadoop+Spark】Hive数据仓库架构、优缺点、数据模型介绍(图文解释 超详细)
186 0
|
6月前
|
SQL 存储 大数据
黑马程序员-大数据入门到实战-分布式SQL计算 Hive 语法与概念
黑马程序员-大数据入门到实战-分布式SQL计算 Hive 语法与概念
76 0