Loading libs/binder/FdTrigger.cpp +14 −4 Original line number Diff line number Diff line Loading @@ -22,6 +22,7 @@ #include <poll.h> #include <android-base/macros.h> #include <android-base/scopeguard.h> #include "RpcState.h" namespace android { Loading Loading @@ -53,25 +54,34 @@ bool FdTrigger::isTriggered() { #endif } status_t FdTrigger::triggerablePoll(base::borrowed_fd fd, int16_t event) { status_t FdTrigger::triggerablePoll(const android::TransportFd& transportFd, int16_t event) { #ifdef BINDER_RPC_SINGLE_THREADED if (mTriggered) { return DEAD_OBJECT; } #endif LOG_ALWAYS_FATAL_IF(event == 0, "triggerablePoll %d with event 0 is not allowed", fd.get()); LOG_ALWAYS_FATAL_IF(event == 0, "triggerablePoll %d with event 0 is not allowed", transportFd.fd.get()); pollfd pfd[]{ {.fd = fd.get(), .events = static_cast<int16_t>(event), .revents = 0}, {.fd = transportFd.fd.get(), .events = static_cast<int16_t>(event), .revents = 0}, #ifndef BINDER_RPC_SINGLE_THREADED {.fd = mRead.get(), .events = 0, .revents = 0}, #endif }; LOG_ALWAYS_FATAL_IF(transportFd.isInPollingState() == true, "Only one thread should be polling on Fd!"); transportFd.setPollingState(true); auto pollingStateGuard = android::base::make_scope_guard([&]() { transportFd.setPollingState(false); }); int ret = TEMP_FAILURE_RETRY(poll(pfd, arraysize(pfd), -1)); if (ret < 0) { return -errno; } LOG_ALWAYS_FATAL_IF(ret == 0, "poll(%d) returns 0 with infinite timeout", fd.get()); LOG_ALWAYS_FATAL_IF(ret == 0, "poll(%d) returns 0 with infinite timeout", transportFd.fd.get()); // At least one FD has events. Check them. Loading libs/binder/FdTrigger.h +3 −1 Original line number Diff line number Diff line Loading @@ -21,6 +21,8 @@ #include <android-base/unique_fd.h> #include <utils/Errors.h> #include <binder/RpcTransport.h> namespace android { /** This is not a pipe. */ Loading Loading @@ -53,7 +55,7 @@ public: * true - time to read! * false - trigger happened */ [[nodiscard]] status_t triggerablePoll(base::borrowed_fd fd, int16_t event); [[nodiscard]] status_t triggerablePoll(const android::TransportFd& transportFd, int16_t event); private: #ifdef BINDER_RPC_SINGLE_THREADED Loading libs/binder/RpcServer.cpp +19 −19 Original line number Diff line number Diff line Loading @@ -86,7 +86,7 @@ status_t RpcServer::setupInetServer(const char* address, unsigned int port, LOG_ALWAYS_FATAL_IF(socketAddress.addr()->sa_family != AF_INET, "expecting inet"); sockaddr_in addr{}; socklen_t len = sizeof(addr); if (0 != getsockname(mServer.get(), reinterpret_cast<sockaddr*>(&addr), &len)) { if (0 != getsockname(mServer.fd.get(), reinterpret_cast<sockaddr*>(&addr), &len)) { int savedErrno = errno; ALOGE("Could not getsockname at %s: %s", socketAddress.toString().c_str(), strerror(savedErrno)); Loading Loading @@ -181,7 +181,7 @@ void RpcServer::join() { { RpcMutexLockGuard _l(mLock); LOG_ALWAYS_FATAL_IF(!mServer.ok(), "RpcServer must be setup to join."); LOG_ALWAYS_FATAL_IF(!mServer.fd.ok(), "RpcServer must be setup to join."); LOG_ALWAYS_FATAL_IF(mShutdownTrigger != nullptr, "Already joined"); mJoinThreadRunning = true; mShutdownTrigger = FdTrigger::make(); Loading @@ -194,24 +194,24 @@ void RpcServer::join() { static_assert(addr.size() >= sizeof(sockaddr_storage), "kRpcAddressSize is too small"); socklen_t addrLen = addr.size(); unique_fd clientFd( TEMP_FAILURE_RETRY(accept4(mServer.get(), reinterpret_cast<sockaddr*>(addr.data()), &addrLen, SOCK_CLOEXEC | SOCK_NONBLOCK))); TransportFd clientSocket(unique_fd(TEMP_FAILURE_RETRY( accept4(mServer.fd.get(), reinterpret_cast<sockaddr*>(addr.data()), &addrLen, SOCK_CLOEXEC | SOCK_NONBLOCK)))); LOG_ALWAYS_FATAL_IF(addrLen > static_cast<socklen_t>(sizeof(sockaddr_storage)), "Truncated address"); if (clientFd < 0) { if (clientSocket.fd < 0) { ALOGE("Could not accept4 socket: %s", strerror(errno)); continue; } LOG_RPC_DETAIL("accept4 on fd %d yields fd %d", mServer.get(), clientFd.get()); LOG_RPC_DETAIL("accept4 on fd %d yields fd %d", mServer.fd.get(), clientSocket.fd.get()); { RpcMutexLockGuard _l(mLock); RpcMaybeThread thread = RpcMaybeThread(&RpcServer::establishConnection, sp<RpcServer>::fromExisting(this), std::move(clientFd), addr, sp<RpcServer>::fromExisting(this), std::move(clientSocket), addr, addrLen, RpcSession::join); auto& threadRef = mConnectingThreads[thread.get_id()]; Loading Loading @@ -296,7 +296,7 @@ size_t RpcServer::numUninitializedSessions() { } void RpcServer::establishConnection( sp<RpcServer>&& server, base::unique_fd clientFd, std::array<uint8_t, kRpcAddressSize> addr, sp<RpcServer>&& server, TransportFd clientFd, std::array<uint8_t, kRpcAddressSize> addr, size_t addrLen, std::function<void(sp<RpcSession>&&, RpcSession::PreJoinSetupResult&&)>&& joinFn) { // mShutdownTrigger can only be cleared once connection threads have joined. Loading @@ -306,7 +306,7 @@ void RpcServer::establishConnection( status_t status = OK; int clientFdForLog = clientFd.get(); int clientFdForLog = clientFd.fd.get(); auto client = server->mCtx->newTransport(std::move(clientFd), server->mShutdownTrigger.get()); if (client == nullptr) { ALOGE("Dropping accept4()-ed socket because sslAccept fails"); Loading Loading @@ -488,15 +488,15 @@ status_t RpcServer::setupSocketServer(const RpcSocketAddress& addr) { LOG_RPC_DETAIL("Setting up socket server %s", addr.toString().c_str()); LOG_ALWAYS_FATAL_IF(hasServer(), "Each RpcServer can only have one server."); unique_fd serverFd(TEMP_FAILURE_RETRY( socket(addr.addr()->sa_family, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0))); if (serverFd == -1) { TransportFd transportFd(unique_fd(TEMP_FAILURE_RETRY( socket(addr.addr()->sa_family, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0)))); if (!transportFd.fd.ok()) { int savedErrno = errno; ALOGE("Could not create socket: %s", strerror(savedErrno)); return -savedErrno; } if (0 != TEMP_FAILURE_RETRY(bind(serverFd.get(), addr.addr(), addr.addrSize()))) { if (0 != TEMP_FAILURE_RETRY(bind(transportFd.fd.get(), addr.addr(), addr.addrSize()))) { int savedErrno = errno; ALOGE("Could not bind socket at %s: %s", addr.toString().c_str(), strerror(savedErrno)); return -savedErrno; Loading @@ -506,7 +506,7 @@ status_t RpcServer::setupSocketServer(const RpcSocketAddress& addr) { // the backlog is increased to a large number. // TODO(b/189955605): Once we create threads dynamically & lazily, the backlog can be reduced // to 1. if (0 != TEMP_FAILURE_RETRY(listen(serverFd.get(), 50 /*backlog*/))) { if (0 != TEMP_FAILURE_RETRY(listen(transportFd.fd.get(), 50 /*backlog*/))) { int savedErrno = errno; ALOGE("Could not listen socket at %s: %s", addr.toString().c_str(), strerror(savedErrno)); return -savedErrno; Loading @@ -514,7 +514,7 @@ status_t RpcServer::setupSocketServer(const RpcSocketAddress& addr) { LOG_RPC_DETAIL("Successfully setup socket server %s", addr.toString().c_str()); if (status_t status = setupExternalServer(std::move(serverFd)); status != OK) { if (status_t status = setupExternalServer(std::move(transportFd.fd)); status != OK) { ALOGE("Another thread has set up server while calling setupSocketServer. Race?"); return status; } Loading Loading @@ -542,17 +542,17 @@ void RpcServer::onSessionIncomingThreadEnded() { bool RpcServer::hasServer() { RpcMutexLockGuard _l(mLock); return mServer.ok(); return mServer.fd.ok(); } unique_fd RpcServer::releaseServer() { RpcMutexLockGuard _l(mLock); return std::move(mServer); return std::move(mServer.fd); } status_t RpcServer::setupExternalServer(base::unique_fd serverFd) { RpcMutexLockGuard _l(mLock); if (mServer.ok()) { if (mServer.fd.ok()) { ALOGE("Each RpcServer can only have one server."); return INVALID_OPERATION; } Loading libs/binder/RpcSession.cpp +17 −10 Original line number Diff line number Diff line Loading @@ -162,7 +162,8 @@ status_t RpcSession::setupInetClient(const char* addr, unsigned int port) { return NAME_NOT_FOUND; } status_t RpcSession::setupPreconnectedClient(unique_fd fd, std::function<unique_fd()>&& request) { status_t RpcSession::setupPreconnectedClient(base::unique_fd fd, std::function<unique_fd()>&& request) { return setupClient([&](const std::vector<uint8_t>& sessionId, bool incoming) -> status_t { if (!fd.ok()) { fd = request(); Loading @@ -172,7 +173,9 @@ status_t RpcSession::setupPreconnectedClient(unique_fd fd, std::function<unique_ ALOGE("setupPreconnectedClient: %s", res.error().message().c_str()); return res.error().code() == 0 ? UNKNOWN_ERROR : -res.error().code(); } status_t status = initAndAddConnection(std::move(fd), sessionId, incoming); TransportFd transportFd(std::move(fd)); status_t status = initAndAddConnection(std::move(transportFd), sessionId, incoming); fd = unique_fd(); // Explicitly reset after move to avoid analyzer warning. return status; }); Loading @@ -190,7 +193,8 @@ status_t RpcSession::addNullDebuggingClient() { return -savedErrno; } auto server = mCtx->newTransport(std::move(serverFd), mShutdownTrigger.get()); TransportFd transportFd(std::move(serverFd)); auto server = mCtx->newTransport(std::move(transportFd), mShutdownTrigger.get()); if (server == nullptr) { ALOGE("Unable to set up RpcTransport"); return UNKNOWN_ERROR; Loading Loading @@ -572,12 +576,14 @@ status_t RpcSession::setupOneSocketConnection(const RpcSocketAddress& addr, return -savedErrno; } if (0 != TEMP_FAILURE_RETRY(connect(serverFd.get(), addr.addr(), addr.addrSize()))) { TransportFd transportFd(std::move(serverFd)); if (0 != TEMP_FAILURE_RETRY(connect(transportFd.fd.get(), addr.addr(), addr.addrSize()))) { int connErrno = errno; if (connErrno == EAGAIN || connErrno == EINPROGRESS) { // For non-blocking sockets, connect() may return EAGAIN (for unix domain socket) or // EINPROGRESS (for others). Call poll() and getsockopt() to get the error. status_t pollStatus = mShutdownTrigger->triggerablePoll(serverFd, POLLOUT); status_t pollStatus = mShutdownTrigger->triggerablePoll(transportFd, POLLOUT); if (pollStatus != OK) { ALOGE("Could not POLLOUT after connect() on non-blocking socket: %s", statusToString(pollStatus).c_str()); Loading @@ -585,8 +591,8 @@ status_t RpcSession::setupOneSocketConnection(const RpcSocketAddress& addr, } // Set connErrno to the errno that connect() would have set if the fd were blocking. socklen_t connErrnoLen = sizeof(connErrno); int ret = getsockopt(serverFd.get(), SOL_SOCKET, SO_ERROR, &connErrno, &connErrnoLen); int ret = getsockopt(transportFd.fd.get(), SOL_SOCKET, SO_ERROR, &connErrno, &connErrnoLen); if (ret == -1) { int savedErrno = errno; ALOGE("Could not getsockopt() after connect() on non-blocking socket: %s. " Loading @@ -608,16 +614,17 @@ status_t RpcSession::setupOneSocketConnection(const RpcSocketAddress& addr, return -connErrno; } } LOG_RPC_DETAIL("Socket at %s client with fd %d", addr.toString().c_str(), serverFd.get()); LOG_RPC_DETAIL("Socket at %s client with fd %d", addr.toString().c_str(), transportFd.fd.get()); return initAndAddConnection(std::move(serverFd), sessionId, incoming); return initAndAddConnection(std::move(transportFd), sessionId, incoming); } ALOGE("Ran out of retries to connect to %s", addr.toString().c_str()); return UNKNOWN_ERROR; } status_t RpcSession::initAndAddConnection(unique_fd fd, const std::vector<uint8_t>& sessionId, status_t RpcSession::initAndAddConnection(TransportFd fd, const std::vector<uint8_t>& sessionId, bool incoming) { LOG_ALWAYS_FATAL_IF(mShutdownTrigger == nullptr); auto server = mCtx->newTransport(std::move(fd), mShutdownTrigger.get()); Loading libs/binder/RpcTransportRaw.cpp +15 −13 Original line number Diff line number Diff line Loading @@ -36,11 +36,11 @@ constexpr size_t kMaxFdsPerMsg = 253; // RpcTransport with TLS disabled. class RpcTransportRaw : public RpcTransport { public: explicit RpcTransportRaw(android::base::unique_fd socket) : mSocket(std::move(socket)) {} explicit RpcTransportRaw(android::TransportFd socket) : mSocket(std::move(socket)) {} status_t pollRead(void) override { uint8_t buf; ssize_t ret = TEMP_FAILURE_RETRY( ::recv(mSocket.get(), &buf, sizeof(buf), MSG_PEEK | MSG_DONTWAIT)); ::recv(mSocket.fd.get(), &buf, sizeof(buf), MSG_PEEK | MSG_DONTWAIT)); if (ret < 0) { int savedErrno = errno; if (savedErrno == EAGAIN || savedErrno == EWOULDBLOCK) { Loading Loading @@ -100,7 +100,7 @@ public: msg.msg_controllen = CMSG_SPACE(fdsByteSize); ssize_t processedSize = TEMP_FAILURE_RETRY( sendmsg(mSocket.get(), &msg, MSG_NOSIGNAL | MSG_CMSG_CLOEXEC)); sendmsg(mSocket.fd.get(), &msg, MSG_NOSIGNAL | MSG_CMSG_CLOEXEC)); if (processedSize > 0) { sentFds = true; } Loading @@ -113,10 +113,10 @@ public: // non-negative int and can be cast to either. .msg_iovlen = static_cast<decltype(msg.msg_iovlen)>(niovs), }; return TEMP_FAILURE_RETRY(sendmsg(mSocket.get(), &msg, MSG_NOSIGNAL)); return TEMP_FAILURE_RETRY(sendmsg(mSocket.fd.get(), &msg, MSG_NOSIGNAL)); }; return interruptableReadOrWrite(mSocket.get(), fdTrigger, iovs, niovs, send, "sendmsg", POLLOUT, altPoll); return interruptableReadOrWrite(mSocket, fdTrigger, iovs, niovs, send, "sendmsg", POLLOUT, altPoll); } status_t interruptableReadFully( Loading @@ -135,7 +135,7 @@ public: .msg_controllen = sizeof(msgControlBuf), }; ssize_t processSize = TEMP_FAILURE_RETRY(recvmsg(mSocket.get(), &msg, MSG_NOSIGNAL)); TEMP_FAILURE_RETRY(recvmsg(mSocket.fd.get(), &msg, MSG_NOSIGNAL)); if (processSize < 0) { return -1; } Loading Loading @@ -171,21 +171,23 @@ public: // non-negative int and can be cast to either. .msg_iovlen = static_cast<decltype(msg.msg_iovlen)>(niovs), }; return TEMP_FAILURE_RETRY(recvmsg(mSocket.get(), &msg, MSG_NOSIGNAL)); return TEMP_FAILURE_RETRY(recvmsg(mSocket.fd.get(), &msg, MSG_NOSIGNAL)); }; return interruptableReadOrWrite(mSocket.get(), fdTrigger, iovs, niovs, recv, "recvmsg", POLLIN, altPoll); return interruptableReadOrWrite(mSocket, fdTrigger, iovs, niovs, recv, "recvmsg", POLLIN, altPoll); } virtual bool isWaiting() { return mSocket.isInPollingState(); } private: base::unique_fd mSocket; android::TransportFd mSocket; }; // RpcTransportCtx with TLS disabled. class RpcTransportCtxRaw : public RpcTransportCtx { public: std::unique_ptr<RpcTransport> newTransport(android::base::unique_fd fd, FdTrigger*) const { return std::make_unique<RpcTransportRaw>(std::move(fd)); std::unique_ptr<RpcTransport> newTransport(android::TransportFd socket, FdTrigger*) const { return std::make_unique<RpcTransportRaw>(std::move(socket)); } std::vector<uint8_t> getCertificate(RpcCertificateFormat) const override { return {}; } }; Loading Loading
libs/binder/FdTrigger.cpp +14 −4 Original line number Diff line number Diff line Loading @@ -22,6 +22,7 @@ #include <poll.h> #include <android-base/macros.h> #include <android-base/scopeguard.h> #include "RpcState.h" namespace android { Loading Loading @@ -53,25 +54,34 @@ bool FdTrigger::isTriggered() { #endif } status_t FdTrigger::triggerablePoll(base::borrowed_fd fd, int16_t event) { status_t FdTrigger::triggerablePoll(const android::TransportFd& transportFd, int16_t event) { #ifdef BINDER_RPC_SINGLE_THREADED if (mTriggered) { return DEAD_OBJECT; } #endif LOG_ALWAYS_FATAL_IF(event == 0, "triggerablePoll %d with event 0 is not allowed", fd.get()); LOG_ALWAYS_FATAL_IF(event == 0, "triggerablePoll %d with event 0 is not allowed", transportFd.fd.get()); pollfd pfd[]{ {.fd = fd.get(), .events = static_cast<int16_t>(event), .revents = 0}, {.fd = transportFd.fd.get(), .events = static_cast<int16_t>(event), .revents = 0}, #ifndef BINDER_RPC_SINGLE_THREADED {.fd = mRead.get(), .events = 0, .revents = 0}, #endif }; LOG_ALWAYS_FATAL_IF(transportFd.isInPollingState() == true, "Only one thread should be polling on Fd!"); transportFd.setPollingState(true); auto pollingStateGuard = android::base::make_scope_guard([&]() { transportFd.setPollingState(false); }); int ret = TEMP_FAILURE_RETRY(poll(pfd, arraysize(pfd), -1)); if (ret < 0) { return -errno; } LOG_ALWAYS_FATAL_IF(ret == 0, "poll(%d) returns 0 with infinite timeout", fd.get()); LOG_ALWAYS_FATAL_IF(ret == 0, "poll(%d) returns 0 with infinite timeout", transportFd.fd.get()); // At least one FD has events. Check them. Loading
libs/binder/FdTrigger.h +3 −1 Original line number Diff line number Diff line Loading @@ -21,6 +21,8 @@ #include <android-base/unique_fd.h> #include <utils/Errors.h> #include <binder/RpcTransport.h> namespace android { /** This is not a pipe. */ Loading Loading @@ -53,7 +55,7 @@ public: * true - time to read! * false - trigger happened */ [[nodiscard]] status_t triggerablePoll(base::borrowed_fd fd, int16_t event); [[nodiscard]] status_t triggerablePoll(const android::TransportFd& transportFd, int16_t event); private: #ifdef BINDER_RPC_SINGLE_THREADED Loading
libs/binder/RpcServer.cpp +19 −19 Original line number Diff line number Diff line Loading @@ -86,7 +86,7 @@ status_t RpcServer::setupInetServer(const char* address, unsigned int port, LOG_ALWAYS_FATAL_IF(socketAddress.addr()->sa_family != AF_INET, "expecting inet"); sockaddr_in addr{}; socklen_t len = sizeof(addr); if (0 != getsockname(mServer.get(), reinterpret_cast<sockaddr*>(&addr), &len)) { if (0 != getsockname(mServer.fd.get(), reinterpret_cast<sockaddr*>(&addr), &len)) { int savedErrno = errno; ALOGE("Could not getsockname at %s: %s", socketAddress.toString().c_str(), strerror(savedErrno)); Loading Loading @@ -181,7 +181,7 @@ void RpcServer::join() { { RpcMutexLockGuard _l(mLock); LOG_ALWAYS_FATAL_IF(!mServer.ok(), "RpcServer must be setup to join."); LOG_ALWAYS_FATAL_IF(!mServer.fd.ok(), "RpcServer must be setup to join."); LOG_ALWAYS_FATAL_IF(mShutdownTrigger != nullptr, "Already joined"); mJoinThreadRunning = true; mShutdownTrigger = FdTrigger::make(); Loading @@ -194,24 +194,24 @@ void RpcServer::join() { static_assert(addr.size() >= sizeof(sockaddr_storage), "kRpcAddressSize is too small"); socklen_t addrLen = addr.size(); unique_fd clientFd( TEMP_FAILURE_RETRY(accept4(mServer.get(), reinterpret_cast<sockaddr*>(addr.data()), &addrLen, SOCK_CLOEXEC | SOCK_NONBLOCK))); TransportFd clientSocket(unique_fd(TEMP_FAILURE_RETRY( accept4(mServer.fd.get(), reinterpret_cast<sockaddr*>(addr.data()), &addrLen, SOCK_CLOEXEC | SOCK_NONBLOCK)))); LOG_ALWAYS_FATAL_IF(addrLen > static_cast<socklen_t>(sizeof(sockaddr_storage)), "Truncated address"); if (clientFd < 0) { if (clientSocket.fd < 0) { ALOGE("Could not accept4 socket: %s", strerror(errno)); continue; } LOG_RPC_DETAIL("accept4 on fd %d yields fd %d", mServer.get(), clientFd.get()); LOG_RPC_DETAIL("accept4 on fd %d yields fd %d", mServer.fd.get(), clientSocket.fd.get()); { RpcMutexLockGuard _l(mLock); RpcMaybeThread thread = RpcMaybeThread(&RpcServer::establishConnection, sp<RpcServer>::fromExisting(this), std::move(clientFd), addr, sp<RpcServer>::fromExisting(this), std::move(clientSocket), addr, addrLen, RpcSession::join); auto& threadRef = mConnectingThreads[thread.get_id()]; Loading Loading @@ -296,7 +296,7 @@ size_t RpcServer::numUninitializedSessions() { } void RpcServer::establishConnection( sp<RpcServer>&& server, base::unique_fd clientFd, std::array<uint8_t, kRpcAddressSize> addr, sp<RpcServer>&& server, TransportFd clientFd, std::array<uint8_t, kRpcAddressSize> addr, size_t addrLen, std::function<void(sp<RpcSession>&&, RpcSession::PreJoinSetupResult&&)>&& joinFn) { // mShutdownTrigger can only be cleared once connection threads have joined. Loading @@ -306,7 +306,7 @@ void RpcServer::establishConnection( status_t status = OK; int clientFdForLog = clientFd.get(); int clientFdForLog = clientFd.fd.get(); auto client = server->mCtx->newTransport(std::move(clientFd), server->mShutdownTrigger.get()); if (client == nullptr) { ALOGE("Dropping accept4()-ed socket because sslAccept fails"); Loading Loading @@ -488,15 +488,15 @@ status_t RpcServer::setupSocketServer(const RpcSocketAddress& addr) { LOG_RPC_DETAIL("Setting up socket server %s", addr.toString().c_str()); LOG_ALWAYS_FATAL_IF(hasServer(), "Each RpcServer can only have one server."); unique_fd serverFd(TEMP_FAILURE_RETRY( socket(addr.addr()->sa_family, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0))); if (serverFd == -1) { TransportFd transportFd(unique_fd(TEMP_FAILURE_RETRY( socket(addr.addr()->sa_family, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0)))); if (!transportFd.fd.ok()) { int savedErrno = errno; ALOGE("Could not create socket: %s", strerror(savedErrno)); return -savedErrno; } if (0 != TEMP_FAILURE_RETRY(bind(serverFd.get(), addr.addr(), addr.addrSize()))) { if (0 != TEMP_FAILURE_RETRY(bind(transportFd.fd.get(), addr.addr(), addr.addrSize()))) { int savedErrno = errno; ALOGE("Could not bind socket at %s: %s", addr.toString().c_str(), strerror(savedErrno)); return -savedErrno; Loading @@ -506,7 +506,7 @@ status_t RpcServer::setupSocketServer(const RpcSocketAddress& addr) { // the backlog is increased to a large number. // TODO(b/189955605): Once we create threads dynamically & lazily, the backlog can be reduced // to 1. if (0 != TEMP_FAILURE_RETRY(listen(serverFd.get(), 50 /*backlog*/))) { if (0 != TEMP_FAILURE_RETRY(listen(transportFd.fd.get(), 50 /*backlog*/))) { int savedErrno = errno; ALOGE("Could not listen socket at %s: %s", addr.toString().c_str(), strerror(savedErrno)); return -savedErrno; Loading @@ -514,7 +514,7 @@ status_t RpcServer::setupSocketServer(const RpcSocketAddress& addr) { LOG_RPC_DETAIL("Successfully setup socket server %s", addr.toString().c_str()); if (status_t status = setupExternalServer(std::move(serverFd)); status != OK) { if (status_t status = setupExternalServer(std::move(transportFd.fd)); status != OK) { ALOGE("Another thread has set up server while calling setupSocketServer. Race?"); return status; } Loading Loading @@ -542,17 +542,17 @@ void RpcServer::onSessionIncomingThreadEnded() { bool RpcServer::hasServer() { RpcMutexLockGuard _l(mLock); return mServer.ok(); return mServer.fd.ok(); } unique_fd RpcServer::releaseServer() { RpcMutexLockGuard _l(mLock); return std::move(mServer); return std::move(mServer.fd); } status_t RpcServer::setupExternalServer(base::unique_fd serverFd) { RpcMutexLockGuard _l(mLock); if (mServer.ok()) { if (mServer.fd.ok()) { ALOGE("Each RpcServer can only have one server."); return INVALID_OPERATION; } Loading
libs/binder/RpcSession.cpp +17 −10 Original line number Diff line number Diff line Loading @@ -162,7 +162,8 @@ status_t RpcSession::setupInetClient(const char* addr, unsigned int port) { return NAME_NOT_FOUND; } status_t RpcSession::setupPreconnectedClient(unique_fd fd, std::function<unique_fd()>&& request) { status_t RpcSession::setupPreconnectedClient(base::unique_fd fd, std::function<unique_fd()>&& request) { return setupClient([&](const std::vector<uint8_t>& sessionId, bool incoming) -> status_t { if (!fd.ok()) { fd = request(); Loading @@ -172,7 +173,9 @@ status_t RpcSession::setupPreconnectedClient(unique_fd fd, std::function<unique_ ALOGE("setupPreconnectedClient: %s", res.error().message().c_str()); return res.error().code() == 0 ? UNKNOWN_ERROR : -res.error().code(); } status_t status = initAndAddConnection(std::move(fd), sessionId, incoming); TransportFd transportFd(std::move(fd)); status_t status = initAndAddConnection(std::move(transportFd), sessionId, incoming); fd = unique_fd(); // Explicitly reset after move to avoid analyzer warning. return status; }); Loading @@ -190,7 +193,8 @@ status_t RpcSession::addNullDebuggingClient() { return -savedErrno; } auto server = mCtx->newTransport(std::move(serverFd), mShutdownTrigger.get()); TransportFd transportFd(std::move(serverFd)); auto server = mCtx->newTransport(std::move(transportFd), mShutdownTrigger.get()); if (server == nullptr) { ALOGE("Unable to set up RpcTransport"); return UNKNOWN_ERROR; Loading Loading @@ -572,12 +576,14 @@ status_t RpcSession::setupOneSocketConnection(const RpcSocketAddress& addr, return -savedErrno; } if (0 != TEMP_FAILURE_RETRY(connect(serverFd.get(), addr.addr(), addr.addrSize()))) { TransportFd transportFd(std::move(serverFd)); if (0 != TEMP_FAILURE_RETRY(connect(transportFd.fd.get(), addr.addr(), addr.addrSize()))) { int connErrno = errno; if (connErrno == EAGAIN || connErrno == EINPROGRESS) { // For non-blocking sockets, connect() may return EAGAIN (for unix domain socket) or // EINPROGRESS (for others). Call poll() and getsockopt() to get the error. status_t pollStatus = mShutdownTrigger->triggerablePoll(serverFd, POLLOUT); status_t pollStatus = mShutdownTrigger->triggerablePoll(transportFd, POLLOUT); if (pollStatus != OK) { ALOGE("Could not POLLOUT after connect() on non-blocking socket: %s", statusToString(pollStatus).c_str()); Loading @@ -585,8 +591,8 @@ status_t RpcSession::setupOneSocketConnection(const RpcSocketAddress& addr, } // Set connErrno to the errno that connect() would have set if the fd were blocking. socklen_t connErrnoLen = sizeof(connErrno); int ret = getsockopt(serverFd.get(), SOL_SOCKET, SO_ERROR, &connErrno, &connErrnoLen); int ret = getsockopt(transportFd.fd.get(), SOL_SOCKET, SO_ERROR, &connErrno, &connErrnoLen); if (ret == -1) { int savedErrno = errno; ALOGE("Could not getsockopt() after connect() on non-blocking socket: %s. " Loading @@ -608,16 +614,17 @@ status_t RpcSession::setupOneSocketConnection(const RpcSocketAddress& addr, return -connErrno; } } LOG_RPC_DETAIL("Socket at %s client with fd %d", addr.toString().c_str(), serverFd.get()); LOG_RPC_DETAIL("Socket at %s client with fd %d", addr.toString().c_str(), transportFd.fd.get()); return initAndAddConnection(std::move(serverFd), sessionId, incoming); return initAndAddConnection(std::move(transportFd), sessionId, incoming); } ALOGE("Ran out of retries to connect to %s", addr.toString().c_str()); return UNKNOWN_ERROR; } status_t RpcSession::initAndAddConnection(unique_fd fd, const std::vector<uint8_t>& sessionId, status_t RpcSession::initAndAddConnection(TransportFd fd, const std::vector<uint8_t>& sessionId, bool incoming) { LOG_ALWAYS_FATAL_IF(mShutdownTrigger == nullptr); auto server = mCtx->newTransport(std::move(fd), mShutdownTrigger.get()); Loading
libs/binder/RpcTransportRaw.cpp +15 −13 Original line number Diff line number Diff line Loading @@ -36,11 +36,11 @@ constexpr size_t kMaxFdsPerMsg = 253; // RpcTransport with TLS disabled. class RpcTransportRaw : public RpcTransport { public: explicit RpcTransportRaw(android::base::unique_fd socket) : mSocket(std::move(socket)) {} explicit RpcTransportRaw(android::TransportFd socket) : mSocket(std::move(socket)) {} status_t pollRead(void) override { uint8_t buf; ssize_t ret = TEMP_FAILURE_RETRY( ::recv(mSocket.get(), &buf, sizeof(buf), MSG_PEEK | MSG_DONTWAIT)); ::recv(mSocket.fd.get(), &buf, sizeof(buf), MSG_PEEK | MSG_DONTWAIT)); if (ret < 0) { int savedErrno = errno; if (savedErrno == EAGAIN || savedErrno == EWOULDBLOCK) { Loading Loading @@ -100,7 +100,7 @@ public: msg.msg_controllen = CMSG_SPACE(fdsByteSize); ssize_t processedSize = TEMP_FAILURE_RETRY( sendmsg(mSocket.get(), &msg, MSG_NOSIGNAL | MSG_CMSG_CLOEXEC)); sendmsg(mSocket.fd.get(), &msg, MSG_NOSIGNAL | MSG_CMSG_CLOEXEC)); if (processedSize > 0) { sentFds = true; } Loading @@ -113,10 +113,10 @@ public: // non-negative int and can be cast to either. .msg_iovlen = static_cast<decltype(msg.msg_iovlen)>(niovs), }; return TEMP_FAILURE_RETRY(sendmsg(mSocket.get(), &msg, MSG_NOSIGNAL)); return TEMP_FAILURE_RETRY(sendmsg(mSocket.fd.get(), &msg, MSG_NOSIGNAL)); }; return interruptableReadOrWrite(mSocket.get(), fdTrigger, iovs, niovs, send, "sendmsg", POLLOUT, altPoll); return interruptableReadOrWrite(mSocket, fdTrigger, iovs, niovs, send, "sendmsg", POLLOUT, altPoll); } status_t interruptableReadFully( Loading @@ -135,7 +135,7 @@ public: .msg_controllen = sizeof(msgControlBuf), }; ssize_t processSize = TEMP_FAILURE_RETRY(recvmsg(mSocket.get(), &msg, MSG_NOSIGNAL)); TEMP_FAILURE_RETRY(recvmsg(mSocket.fd.get(), &msg, MSG_NOSIGNAL)); if (processSize < 0) { return -1; } Loading Loading @@ -171,21 +171,23 @@ public: // non-negative int and can be cast to either. .msg_iovlen = static_cast<decltype(msg.msg_iovlen)>(niovs), }; return TEMP_FAILURE_RETRY(recvmsg(mSocket.get(), &msg, MSG_NOSIGNAL)); return TEMP_FAILURE_RETRY(recvmsg(mSocket.fd.get(), &msg, MSG_NOSIGNAL)); }; return interruptableReadOrWrite(mSocket.get(), fdTrigger, iovs, niovs, recv, "recvmsg", POLLIN, altPoll); return interruptableReadOrWrite(mSocket, fdTrigger, iovs, niovs, recv, "recvmsg", POLLIN, altPoll); } virtual bool isWaiting() { return mSocket.isInPollingState(); } private: base::unique_fd mSocket; android::TransportFd mSocket; }; // RpcTransportCtx with TLS disabled. class RpcTransportCtxRaw : public RpcTransportCtx { public: std::unique_ptr<RpcTransport> newTransport(android::base::unique_fd fd, FdTrigger*) const { return std::make_unique<RpcTransportRaw>(std::move(fd)); std::unique_ptr<RpcTransport> newTransport(android::TransportFd socket, FdTrigger*) const { return std::make_unique<RpcTransportRaw>(std::move(socket)); } std::vector<uint8_t> getCertificate(RpcCertificateFormat) const override { return {}; } }; Loading