Hadoop RPC 代码分析
目录
典型的RPC要解决如下问题:
- Net IO: 网络连接维护、IO收发处理、Async & Event机制(Epoll);
- Serialize/Deserialize: 数据编解码;
- Request/Response Manage: 请求/应答处理、并发管理、重发、超时处理;
- Interface: 调用入口;
1 Client
- 两种Serialize方式, ProtobufRpcEngine和WritableRpcEngine;
class Call
获取callId,callIdCounter
维护CallId, 0x7FFFFFFF回绕;getConnection
获取一个连接,之后在此连接上发送请求connection.sendRpcRequest(call)
;getRpcResponse
获取应答;- 用户通过接口
connection.sendRpcRequest(call)
调用,阻塞用户线程; - Connection通过ConcurrentHash维护
ConcurrentMap<ConnectionId, Connection>
; - Connection内的多个Call通过Hash维护
Hashtable<Integer, Call>
; - Connection派生自Thread, 是一个线程,后台run获取server应答;
run() -> while(waitForWork()) receiveRpcResponse();
getConection
时,通过setupIOstreams
设置Input/Output Stream; 生成Input/Output Stream过程中,进行鉴权认证;
1.1 DFSClient: 实例化例子
DFSClient构造函数创建proxyInfo,以及构造namenode:
if (proxyInfo != null) { // ... } else if (rpcNamenode != null) { // This case is used for testing. } else { Preconditions.checkArgument(nameNodeUri != null, "null URI"); proxyInfo = NameNodeProxiesClient.createProxyWithClientProtocol( conf, nameNodeUri, nnFallbackToSimpleAuth); this.dtService = proxyInfo.getDelegationTokenService(); this.namenode = proxyInfo.getProxy(); } public static ProxyAndInfo<ClientProtocol> createProxyWithClientProtocol( Configuration conf, URI nameNodeUri, AtomicBoolean fallbackToSimpleAuth) throws IOException { AbstractNNFailoverProxyProvider<ClientProtocol> failoverProxyProvider = createFailoverProxyProvider(conf, nameNodeUri, ClientProtocol.class, true, fallbackToSimpleAuth); if (failoverProxyProvider == null) { InetSocketAddress nnAddr = DFSUtilClient.getNNAddress(nameNodeUri); Text dtService = SecurityUtil.buildTokenService(nnAddr); ClientProtocol proxy = createNonHAProxyWithClientProtocol( nnAddr, conf, UserGroupInformation.getCurrentUser(), true, fallbackToSimpleAuth); return new ProxyAndInfo<>(proxy, dtService, nnAddr); } else { return createHAProxy(conf, nameNodeUri, ClientProtocol.class, failoverProxyProvider); } } public static ClientProtocol createNonHAProxyWithClientProtocol( InetSocketAddress address, Configuration conf, UserGroupInformation ugi, boolean withRetries, AtomicBoolean fallbackToSimpleAuth) throws IOException { return createProxyWithAlignmentContext(address, conf, ugi, withRetries, fallbackToSimpleAuth, null); } public static ClientProtocol createProxyWithAlignmentContext( InetSocketAddress address, Configuration conf, UserGroupInformation ugi, boolean withRetries, AtomicBoolean fallbackToSimpleAuth, AlignmentContext alignmentContext) throws IOException { RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine2.class); final RetryPolicy defaultPolicy = RetryUtils.getDefaultRetryPolicy( conf, HdfsClientConfigKeys.Retry.POLICY_ENABLED_KEY, HdfsClientConfigKeys.Retry.POLICY_ENABLED_DEFAULT, HdfsClientConfigKeys.Retry.POLICY_SPEC_KEY, HdfsClientConfigKeys.Retry.POLICY_SPEC_DEFAULT, SafeModeException.class.getName()); final long version = RPC.getProtocolVersion(ClientNamenodeProtocolPB.class); ClientNamenodeProtocolPB proxy = RPC.getProtocolProxy( ClientNamenodeProtocolPB.class, version, address, ugi, conf, NetUtils.getDefaultSocketFactory(conf), org.apache.hadoop.ipc.Client.getTimeout(conf), defaultPolicy, fallbackToSimpleAuth, alignmentContext).getProxy(); if (withRetries) { // create the proxy with retries Map<String, RetryPolicy> methodNameToPolicyMap = new HashMap<>(); ClientProtocol translatorProxy = new ClientNamenodeProtocolTranslatorPB(proxy); return (ClientProtocol) RetryProxy.create( ClientProtocol.class, new DefaultFailoverProxyProvider<>(ClientProtocol.class, translatorProxy), methodNameToPolicyMap, defaultPolicy); } else { return new ClientNamenodeProtocolTranslatorPB(proxy); } }
NameNodeProxiesClient.createProxyWithClientProtocol
1.2 ClientNamenodeProtocolTranslatorPB
/** * This class forwards NN's ClientProtocol calls as RPC calls to the NN server * while translating from the parameter types used in ClientProtocol to the * new PB types. */
以 create
为例:
public final FSDataOutputStream create(final Path f, final EnumSet<CreateFlag> createFlag, Options.CreateOpts... opts) throws AccessControlException, FileAlreadyExistsException, FileNotFoundException, ParentNotDirectoryException, UnsupportedFileSystemException, UnresolvedLinkException, IOException { checkPath(f); int bufferSize = -1; short replication = -1; long blockSize = -1; int bytesPerChecksum = -1; ChecksumOpt checksumOpt = null; FsPermission permission = null; Progressable progress = null; Boolean createParent = null; for (CreateOpts iOpt : opts) { if (CreateOpts.BlockSize.class.isInstance(iOpt)) { if (blockSize != -1) { throw new HadoopIllegalArgumentException( "BlockSize option is set multiple times"); } blockSize = ((CreateOpts.BlockSize) iOpt).getValue(); } else if (CreateOpts.BufferSize.class.isInstance(iOpt)) { if (bufferSize != -1) { throw new HadoopIllegalArgumentException( "BufferSize option is set multiple times"); } bufferSize = ((CreateOpts.BufferSize) iOpt).getValue(); } else if (...) { // ... } else { throw new HadoopIllegalArgumentException("Unkown CreateOpts of type " + iOpt.getClass().getName()); } } // ... return this.createInternal(f, createFlag, permission, bufferSize, replication, blockSize, progress, checksumOpt, createParent); } @Override public HdfsDataOutputStream createInternal(Path f, EnumSet<CreateFlag> createFlag, FsPermission absolutePermission, int bufferSize, short replication, long blockSize, Progressable progress, ChecksumOpt checksumOpt, boolean createParent) throws IOException { final DFSOutputStream dfsos = dfs.primitiveCreate(getUriPath(f), absolutePermission, createFlag, createParent, replication, blockSize, progress, bufferSize, checksumOpt); return dfs.createWrappedOutputStream(dfsos, statistics, dfsos.getInitialLen()); } public DFSOutputStream primitiveCreate(String src, FsPermission absPermission, EnumSet<CreateFlag> flag, boolean createParent, short replication, long blockSize, Progressable progress, int buffersize, ChecksumOpt checksumOpt) throws IOException { checkOpen(); CreateFlag.validate(flag); DFSOutputStream result = primitiveAppend(src, flag, progress); if (result == null) { DataChecksum checksum = dfsClientConf.createChecksum(checksumOpt); result = DFSOutputStream.newStreamForCreate(this, src, absPermission, flag, createParent, replication, blockSize, progress, checksum, null, null, null); } beginFileLease(result.getFileId(), result); return result; } static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src, FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent, short replication, long blockSize, Progressable progress, DataChecksum checksum, String[] favoredNodes, String ecPolicyName, String storagePolicy) throws IOException { try (TraceScope ignored = dfsClient.newPathTraceScope("newStreamForCreate", src)) { HdfsFileStatus stat = null; // Retry the create if we get a RetryStartFileException up to a maximum // number of times boolean shouldRetry = true; int retryCount = CREATE_RETRY_COUNT; while (shouldRetry) { shouldRetry = false; try { stat = dfsClient.namenode.create(src, masked, dfsClient.clientName, new EnumSetWritable<>(flag), createParent, replication, blockSize, SUPPORTED_CRYPTO_VERSIONS, ecPolicyName, storagePolicy); break; } catch (RemoteException re) { IOException e = re.unwrapRemoteException( AccessControlException.class, DSQuotaExceededException.class, QuotaByStorageTypeExceededException.class, FileAlreadyExistsException.class, FileNotFoundException.class, ParentNotDirectoryException.class, NSQuotaExceededException.class, RetryStartFileException.class, SafeModeException.class, UnresolvedPathException.class, SnapshotAccessControlException.class, UnknownCryptoProtocolVersionException.class); if (e instanceof RetryStartFileException) { if (retryCount > 0) { shouldRetry = true; retryCount--; } else { throw new IOException("Too many retries because of encryption" + " zone operations", e); } } else { throw e; } } } Preconditions.checkNotNull(stat, "HdfsFileStatus should not be null!"); final DFSOutputStream out; if(stat.getErasureCodingPolicy() != null) { out = new DFSStripedOutputStream(dfsClient, src, stat, flag, progress, checksum, favoredNodes); } else { out = new DFSOutputStream(dfsClient, src, stat, flag, progress, checksum, favoredNodes, true); } out.start(); return out; } } // ClientNamenodeProtocolTranslatorPB.java @Override public HdfsFileStatus create(String src, FsPermission masked, String clientName, EnumSetWritable<CreateFlag> flag, boolean createParent, short replication, long blockSize, CryptoProtocolVersion[] supportedVersions, String ecPolicyName, String storagePolicy) throws IOException { CreateRequestProto.Builder builder = CreateRequestProto.newBuilder() .setSrc(src) .setMasked(PBHelperClient.convert(masked)) .setClientName(clientName) .setCreateFlag(PBHelperClient.convertCreateFlag(flag)) .setCreateParent(createParent) .setReplication(replication) .setBlockSize(blockSize); if (ecPolicyName != null) { builder.setEcPolicyName(ecPolicyName); } if (storagePolicy != null) { builder.setStoragePolicy(storagePolicy); } FsPermission unmasked = masked.getUnmasked(); if (unmasked != null) { builder.setUnmasked(PBHelperClient.convert(unmasked)); } builder.addAllCryptoProtocolVersion( PBHelperClient.convert(supportedVersions)); CreateRequestProto req = builder.build(); try { // NameNodeProxiesClient.java init rpcProxy from RPC.getProtocolProxy. CreateResponseProto res = rpcProxy.create(null, req); return res.hasFs() ? PBHelperClient.convert(res.getFs()) : null; } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } } // bind to invoke @Override public Message invoke(Object proxy, final Method method, Object[] args) throws ServiceException { long startTime = 0; if (LOG.isDebugEnabled()) { startTime = Time.now(); } if (args.length != 2) { // RpcController + Message throw new ServiceException( "Too many or few parameters for request. Method: [" + method.getName() + "]" + ", Expected: 2, Actual: " + args.length); } if (args[1] == null) { throw new ServiceException("null param while calling Method: [" + method.getName() + "]"); } // if Tracing is on then start a new span for this rpc. // guard it in the if statement to make sure there isn't // any extra string manipulation. Tracer tracer = Tracer.curThreadTracer(); TraceScope traceScope = null; if (tracer != null) { traceScope = tracer.newScope(RpcClientUtil.methodToTraceString(method)); } if (LOG.isTraceEnabled()) { LOG.trace(Thread.currentThread().getId() + ": Call -> " + remoteId + ": " + method.getName() + " {" + TextFormat.shortDebugString((Message) args[1]) + "}"); } final Message theRequest = (Message) args[1]; final RpcWritable.Buffer val; try { val = (RpcWritable.Buffer) client.call(RPC.RpcKind.RPC_PROTOCOL_BUFFER, constructRpcRequest(method, theRequest), remoteId, fallbackToSimpleAuth, alignmentContext); } catch (Throwable e) { if (LOG.isTraceEnabled()) { LOG.trace(Thread.currentThread().getId() + ": Exception <- " + remoteId + ": " + method.getName() + " {" + e + "}"); } if (traceScope != null) { traceScope.addTimelineAnnotation("Call got exception: " + e.toString()); } throw new ServiceException(e); } finally { if (traceScope != null) { traceScope.close(); } } if (LOG.isDebugEnabled()) { long callTime = Time.now() - startTime; LOG.debug("Call: " + method.getName() + " took " + callTime + "ms"); } if (Client.isAsynchronousMode()) { final AsyncGet<RpcWritable.Buffer, IOException> arr = Client.getAsyncRpcResponse(); final AsyncGet<Message, Exception> asyncGet = new AsyncGet<Message, Exception>() { @Override public Message get(long timeout, TimeUnit unit) throws Exception { return getReturnMessage(method, arr.get(timeout, unit)); } @Override public boolean isDone() { return arr.isDone(); } }; ASYNC_RETURN_MESSAGE.set(asyncGet); return null; } else { return getReturnMessage(method, val); } }
rpc请求递交给rpcProxy:
1.3 ProtobufRpcEngine2
@Override @SuppressWarnings("unchecked") public <T> ProtocolProxy<T> getProxy( Class<T> protocol, long clientVersion, ConnectionId connId, Configuration conf, SocketFactory factory) throws IOException { final Invoker invoker = new Invoker(protocol, connId, conf, factory); return new ProtocolProxy<T>( protocol, (T) Proxy.newProxyInstance( protocol.getClassLoader(), new Class[] {protocol}, invoker), false); }
2 Server
Server通过getServer创建,Reactor模式.
- RpcRequestWrapper, RpcResponseWrapper;
- RpcInvoker, 通过反射调用Service方法;
- 基于Nio实现Reactor驱动处于Accept,Read,Write;
public static class Server extends RPC.Server { } @Override public RPC.Server getServer(Class<?> protocol, Object protocolImpl, String bindAddress, int port, int numHandlers, int numReaders, int queueSizePerHandler, boolean verbose, Configuration conf, SecretManager<? extends TokenIdentifier> secretManager, String portRangeConfig, AlignmentContext alignmentContext) throws IOException { return new Server(protocol, protocolImpl, conf, bindAddress, port, numHandlers, numReaders, queueSizePerHandler, verbose, secretManager, portRangeConfig, alignmentContext); } // hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java public abstract class Server { /** * Constructs a server listening on the named port and address. Parameters passed must * be of the named class. The <code>handlerCount</code> determines * the number of handler threads that will be used to process calls. * If queueSizePerHandler or numReaders are not -1 they will be used instead of parameters * from configuration. Otherwise the configuration will be picked up. * * If rpcRequestClass is null then the rpcRequestClass must have been * registered via {@link #registerProtocolEngine(RPC.RpcKind, * Class, RPC.RpcInvoker)} * This parameter has been retained for compatibility with existing tests * and usage. * * @param bindAddress input bindAddress. * @param port input port. * @param rpcRequestClass input rpcRequestClass. * @param handlerCount input handlerCount. * @param numReaders input numReaders. * @param queueSizePerHandler input queueSizePerHandler. * @param conf input Configuration. * @param serverName input serverName. * @param secretManager input secretManager. * @param portRangeConfig input portRangeConfig. * @throws IOException raised on errors performing I/O. */ @SuppressWarnings("unchecked") protected Server(String bindAddress, int port, Class<? extends Writable> rpcRequestClass, int handlerCount, int numReaders, int queueSizePerHandler, Configuration conf, String serverName, SecretManager<? extends TokenIdentifier> secretManager, String portRangeConfig) throws IOException { this.bindAddress = bindAddress; this.conf = conf; this.portRangeConfig = portRangeConfig; this.port = port; this.rpcRequestClass = rpcRequestClass; this.handlerCount = handlerCount; this.socketSendBufferSize = 0; this.serverName = serverName; this.auxiliaryListenerMap = null; this.maxDataLength = conf.getInt(CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH, CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT); if (queueSizePerHandler != -1) { this.maxQueueSize = handlerCount * queueSizePerHandler; } else { this.maxQueueSize = handlerCount * conf.getInt( CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_KEY, CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_DEFAULT); } this.maxRespSize = conf.getInt( CommonConfigurationKeys.IPC_SERVER_RPC_MAX_RESPONSE_SIZE_KEY, CommonConfigurationKeys.IPC_SERVER_RPC_MAX_RESPONSE_SIZE_DEFAULT); if (numReaders != -1) { this.readThreads = numReaders; } else { this.readThreads = conf.getInt( CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY, CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_DEFAULT); } this.readerPendingConnectionQueue = conf.getInt( CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_KEY, CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_DEFAULT); // Setup appropriate callqueue final String prefix = getQueueClassPrefix(); this.callQueue = new CallQueueManager<>( getQueueClass(CommonConfigurationKeys.IPC_NAMESPACE, port, conf), getSchedulerClass(CommonConfigurationKeys.IPC_NAMESPACE, port, conf), getClientBackoffEnable(CommonConfigurationKeys.IPC_NAMESPACE, port, conf), maxQueueSize, prefix, conf); this.secretManager = (SecretManager<TokenIdentifier>) secretManager; this.authorize = conf.getBoolean(CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false); // configure supported authentications this.enabledAuthMethods = getAuthMethods(secretManager, conf); this.negotiateResponse = buildNegotiateResponse(enabledAuthMethods); // Start the listener here and let it bind to the port listener = new Listener(port); // set the server port to the default listener port. this.port = listener.getAddress().getPort(); connectionManager = new ConnectionManager(); this.rpcMetrics = RpcMetrics.create(this, conf); this.rpcDetailedMetrics = RpcDetailedMetrics.create(this.port); this.tcpNoDelay = conf.getBoolean( CommonConfigurationKeysPublic.IPC_SERVER_TCPNODELAY_KEY, CommonConfigurationKeysPublic.IPC_SERVER_TCPNODELAY_DEFAULT); this.setLogSlowRPC(conf.getBoolean( CommonConfigurationKeysPublic.IPC_SERVER_LOG_SLOW_RPC, CommonConfigurationKeysPublic.IPC_SERVER_LOG_SLOW_RPC_DEFAULT)); this.setPurgeIntervalNanos(conf.getInt( CommonConfigurationKeysPublic.IPC_SERVER_PURGE_INTERVAL_MINUTES_KEY, CommonConfigurationKeysPublic.IPC_SERVER_PURGE_INTERVAL_MINUTES_DEFAULT)); // Create the responder here responder = new Responder(); if (secretManager != null || UserGroupInformation.isSecurityEnabled()) { SaslRpcServer.init(conf); saslPropsResolver = SaslPropertiesResolver.getInstance(conf); } this.exceptionsHandler.addTerseLoggingExceptions(StandbyException.class); this.exceptionsHandler.addTerseLoggingExceptions( HealthCheckFailedException.class); this.metricsUpdaterInterval = conf.getLong(CommonConfigurationKeysPublic.IPC_SERVER_METRICS_UPDATE_RUNNER_INTERVAL, CommonConfigurationKeysPublic.IPC_SERVER_METRICS_UPDATE_RUNNER_INTERVAL_DEFAULT); this.scheduledExecutorService = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Hadoop-Metrics-Updater-%d") .build()); this.scheduledExecutorService.scheduleWithFixedDelay(new MetricsUpdateRunner(), metricsUpdaterInterval, metricsUpdaterInterval, TimeUnit.MILLISECONDS); } public synchronized void addAuxiliaryListener(int auxiliaryPort) throws IOException { if (auxiliaryListenerMap == null) { auxiliaryListenerMap = new HashMap<>(); } if (auxiliaryListenerMap.containsKey(auxiliaryPort) && auxiliaryPort != 0) { throw new IOException( "There is already a listener binding to: " + auxiliaryPort); } Listener newListener = new Listener(auxiliaryPort); newListener.setIsAuxiliary(); // in the case of port = 0, the listener would be on a != 0 port. LOG.info("Adding a server listener on port " + newListener.getAddress().getPort()); auxiliaryListenerMap.put(newListener.getAddress().getPort(), newListener); } }
2.1 Code Trace
// NameNodeRpcServer.java public NameNodeRpcServer(Configuration conf, NameNode nn) throws IOException { ClientNamenodeProtocolServerSideTranslatorPB clientProtocolServerTranslator = new ClientNamenodeProtocolServerSideTranslatorPB(this); BlockingService clientNNPbService = ClientNamenodeProtocol. newReflectiveBlockingService(clientProtocolServerTranslator); InetSocketAddress serviceRpcAddr = nn.getServiceRpcServerAddress(conf); if (serviceRpcAddr != null) { int serviceHandlerCount = conf.getInt(DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY, DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT); serviceRpcServer = new RPC.Builder(conf) .setProtocol( org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class) .setInstance(clientNNPbService) .setBindAddress(bindHost) .setPort(serviceRpcAddr.getPort()) .setNumHandlers(serviceHandlerCount) .setVerbose(false) .setSecretManager(namesystem.getDelegationTokenSecretManager()) .build(); // Update the address with the correct port InetSocketAddress listenAddr = serviceRpcServer.getListenerAddress(); serviceRPCAddress = new InetSocketAddress( serviceRpcAddr.getHostName(), listenAddr.getPort()); nn.setRpcServiceServerAddress(conf, serviceRPCAddress); } } // ClientNamenodeProtocolServerSideTranslatorPB.java @Override public CreateResponseProto create(RpcController controller, CreateRequestProto req) throws ServiceException { try { FsPermission masked = req.hasUnmasked() ? FsCreateModes.create(PBHelperClient.convert(req.getMasked()), PBHelperClient.convert(req.getUnmasked())) : PBHelperClient.convert(req.getMasked()); HdfsFileStatus result = server.create(req.getSrc(), masked, req.getClientName(), PBHelperClient.convertCreateFlag(req.getCreateFlag()), req.getCreateParent(), (short) req.getReplication(), req.getBlockSize(), PBHelperClient.convertCryptoProtocolVersions( req.getCryptoProtocolVersionList()), req.getEcPolicyName(), req.getStoragePolicy()); if (result != null) { return CreateResponseProto.newBuilder().setFs(PBHelperClient.convert(result)) .build(); } return VOID_CREATE_RESPONSE; } catch (IOException e) { throw new ServiceException(e); } } // hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java // Optional. @Override // ClientProtocol public HdfsFileStatus create(String src, FsPermission masked, String clientName, EnumSetWritable<CreateFlag> flag, boolean createParent, short replication, long blockSize, CryptoProtocolVersion[] supportedVersions, String ecPolicyName, String storagePolicy) throws IOException { return clientProto.create(src, masked, clientName, flag, createParent, replication, blockSize, supportedVersions, ecPolicyName, storagePolicy); } // hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @Override // ClientProtocol public HdfsFileStatus create(String src, FsPermission masked, String clientName, EnumSetWritable<CreateFlag> flag, boolean createParent, short replication, long blockSize, CryptoProtocolVersion[] supportedVersions, String ecPolicyName, String storagePolicy) throws IOException { checkNNStartup(); String clientMachine = getClientMachine(); if (stateChangeLog.isDebugEnabled()) { stateChangeLog.debug("*DIR* NameNode.create: file " +src+" for "+clientName+" at "+clientMachine); } if (!checkPathLength(src)) { throw new IOException("create: Pathname too long. Limit " + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels."); } namesystem.checkOperation(OperationCategory.WRITE); CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache, null); if (cacheEntry != null && cacheEntry.isSuccess()) { return (HdfsFileStatus) cacheEntry.getPayload(); } HdfsFileStatus status = null; try { PermissionStatus perm = new PermissionStatus(getRemoteUser() .getShortUserName(), null, masked); status = namesystem.startFile(src, perm, clientName, clientMachine, flag.get(), createParent, replication, blockSize, supportedVersions, ecPolicyName, storagePolicy, cacheEntry != null); } finally { RetryCache.setState(cacheEntry, status != null, status); } metrics.incrFilesCreated(); metrics.incrCreateFileOps(); return status; } // hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java /** * Create a new file entry in the namespace. * * For description of parameters and exceptions thrown see * {@link ClientProtocol#create}, except it returns valid file status upon * success */ HdfsFileStatus startFile(String src, PermissionStatus permissions, String holder, String clientMachine, EnumSet<CreateFlag> flag, boolean createParent, short replication, long blockSize, CryptoProtocolVersion[] supportedVersions, String ecPolicyName, String storagePolicy, boolean logRetryCache) throws IOException { HdfsFileStatus status; try { status = startFileInt(src, permissions, holder, clientMachine, flag, createParent, replication, blockSize, supportedVersions, ecPolicyName, storagePolicy, logRetryCache); } catch (AccessControlException e) { logAuditEvent(false, "create", src); throw e; } logAuditEvent(true, "create", src, status); return status; }