Loading libsysutils/include/sysutils/SocketClient.h +4 −1 Original line number Diff line number Diff line #ifndef _SOCKET_CLIENT_H #define _SOCKET_CLIENT_H #include "List.h" #include <pthread.h> #include <cutils/atomic.h> #include <sys/types.h> Loading Loading @@ -33,7 +35,7 @@ public: SocketClient(int sock, bool owned, bool useCmdNum); virtual ~SocketClient(); int getSocket() const { return mSocket; } int getSocket() { return mSocket; } pid_t getPid() const { return mPid; } uid_t getUid() const { return mUid; } gid_t getGid() const { return mGid; } Loading Loading @@ -82,4 +84,5 @@ private: int sendDataLockedv(struct iovec *iov, int iovcnt); }; typedef android::sysutils::List<SocketClient *> SocketClientCollection; #endif libsysutils/include/sysutils/SocketListener.h +2 −4 Original line number Diff line number Diff line /* * Copyright (C) 2008 The Android Open Source Project * Copyright (C) 2008-2014 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. Loading @@ -18,8 +18,6 @@ #include <pthread.h> #include <unordered_map> #include <sysutils/SocketClient.h> #include "SocketClientCommand.h" Loading @@ -27,7 +25,7 @@ class SocketListener { bool mListen; const char *mSocketName; int mSock; std::unordered_map<int, SocketClient*> mClients; SocketClientCollection *mClients; pthread_mutex_t mClientsLock; int mCtrlPipe[2]; pthread_t mThread; Loading libsysutils/src/SocketListener.cpp +101 −63 Original line number Diff line number Diff line /* * Copyright (C) 2008 The Android Open Source Project * Copyright (C) 2008-2014 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. Loading @@ -19,15 +19,13 @@ #include <errno.h> #include <stdio.h> #include <stdlib.h> #include <sys/poll.h> #include <sys/select.h> #include <sys/socket.h> #include <sys/time.h> #include <sys/types.h> #include <sys/un.h> #include <unistd.h> #include <vector> #include <cutils/sockets.h> #include <log/log.h> #include <sysutils/SocketListener.h> Loading @@ -54,6 +52,7 @@ void SocketListener::init(const char *socketName, int socketFd, bool listen, boo mSock = socketFd; mUseCmdNum = useCmdNum; pthread_mutex_init(&mClientsLock, NULL); mClients = new SocketClientCollection(); } SocketListener::~SocketListener() { Loading @@ -64,9 +63,12 @@ SocketListener::~SocketListener() { close(mCtrlPipe[0]); close(mCtrlPipe[1]); } for (auto pair : mClients) { pair.second->decRef(); SocketClientCollection::iterator it; for (it = mClients->begin(); it != mClients->end();) { (*it)->decRef(); it = mClients->erase(it); } delete mClients; } int SocketListener::startListener() { Loading @@ -93,7 +95,7 @@ int SocketListener::startListener(int backlog) { SLOGE("Unable to listen on socket (%s)", strerror(errno)); return -1; } else if (!mListen) mClients[mSock] = new SocketClient(mSock, false, mUseCmdNum); mClients->push_back(new SocketClient(mSock, false, mUseCmdNum)); if (pipe(mCtrlPipe)) { SLOGE("pipe failed (%s)", strerror(errno)); Loading Loading @@ -133,10 +135,11 @@ int SocketListener::stopListener() { mSock = -1; } for (auto pair : mClients) { delete pair.second; SocketClientCollection::iterator it; for (it = mClients->begin(); it != mClients->end();) { delete (*it); it = mClients->erase(it); } mClients.clear(); return 0; } Loading @@ -149,30 +152,47 @@ void *SocketListener::threadStart(void *obj) { } void SocketListener::runListener() { while (true) { std::vector<pollfd> fds; SocketClientCollection pendingList; while(1) { SocketClientCollection::iterator it; fd_set read_fds; int rc = 0; int max = -1; FD_ZERO(&read_fds); if (mListen) { max = mSock; FD_SET(mSock, &read_fds); } FD_SET(mCtrlPipe[0], &read_fds); if (mCtrlPipe[0] > max) max = mCtrlPipe[0]; pthread_mutex_lock(&mClientsLock); fds.reserve(2 + mClients.size()); fds.push_back({.fd = mCtrlPipe[0], .events = POLLIN}); if (mListen) fds.push_back({.fd = mSock, .events = POLLIN}); for (auto pair : mClients) { for (it = mClients->begin(); it != mClients->end(); ++it) { // NB: calling out to an other object with mClientsLock held (safe) const int fd = pair.second->getSocket(); if (fd != pair.first) SLOGE("fd mismatch: %d != %d", fd, pair.first); fds.push_back({.fd = fd, .events = POLLIN}); int fd = (*it)->getSocket(); FD_SET(fd, &read_fds); if (fd > max) { max = fd; } } pthread_mutex_unlock(&mClientsLock); SLOGV("mListen=%d, mSocketName=%s", mListen, mSocketName); int rc = TEMP_FAILURE_RETRY(poll(fds.data(), fds.size(), -1)); if (rc < 0) { SLOGE("poll failed (%s) mListen=%d", strerror(errno), mListen); SLOGV("mListen=%d, max=%d, mSocketName=%s", mListen, max, mSocketName); if ((rc = select(max + 1, &read_fds, NULL, NULL, NULL)) < 0) { if (errno == EINTR) continue; SLOGE("select failed (%s) mListen=%d, max=%d", strerror(errno), mListen, max); sleep(1); continue; } } else if (!rc) continue; if (fds[0].revents & (POLLIN | POLLERR)) { if (FD_ISSET(mCtrlPipe[0], &read_fds)) { char c = CtrlPipe_Shutdown; TEMP_FAILURE_RETRY(read(mCtrlPipe[0], &c, 1)); if (c == CtrlPipe_Shutdown) { Loading @@ -180,7 +200,7 @@ void SocketListener::runListener() { } continue; } if (mListen && (fds[1].revents & (POLLIN | POLLERR))) { if (mListen && FD_ISSET(mSock, &read_fds)) { int c = TEMP_FAILURE_RETRY(accept4(mSock, nullptr, nullptr, SOCK_CLOEXEC)); if (c < 0) { SLOGE("accept failed (%s)", strerror(errno)); Loading @@ -188,33 +208,32 @@ void SocketListener::runListener() { continue; } pthread_mutex_lock(&mClientsLock); mClients[c] = new SocketClient(c, true, mUseCmdNum); mClients->push_back(new SocketClient(c, true, mUseCmdNum)); pthread_mutex_unlock(&mClientsLock); } // Add all active clients to the pending list first, so we can release // the lock before invoking the callbacks. std::vector<SocketClient*> pending; /* Add all active clients to the pending list first */ pendingList.clear(); pthread_mutex_lock(&mClientsLock); const int size = fds.size(); for (int i = mListen ? 2 : 1; i < size; ++i) { const struct pollfd& p = fds[i]; if (p.events & (POLLIN | POLLERR)) { auto it = mClients.find(p.fd); if (it == mClients.end()) { SLOGE("fd vanished: %d", p.fd); continue; } SocketClient* c = it->second; pending.push_back(c); for (it = mClients->begin(); it != mClients->end(); ++it) { SocketClient* c = *it; // NB: calling out to an other object with mClientsLock held (safe) int fd = c->getSocket(); if (FD_ISSET(fd, &read_fds)) { pendingList.push_back(c); c->incRef(); } } pthread_mutex_unlock(&mClientsLock); for (SocketClient* c : pending) { // Process it, if false is returned, remove from the map SLOGV("processing fd %d", c->getSocket()); /* Process the pending list, since it is owned by the thread, * there is no need to lock it */ while (!pendingList.empty()) { /* Pop the first item from the list */ it = pendingList.begin(); SocketClient* c = *it; pendingList.erase(it); /* Process it, if false is returned, remove from list */ if (!onDataAvailable(c)) { release(c, false); } Loading @@ -227,10 +246,17 @@ bool SocketListener::release(SocketClient* c, bool wakeup) { bool ret = false; /* if our sockets are connection-based, remove and destroy it */ if (mListen && c) { /* Remove the client from our map */ /* Remove the client from our array */ SLOGV("going to zap %d for %s", c->getSocket(), mSocketName); pthread_mutex_lock(&mClientsLock); ret = (mClients.erase(c->getSocket()) != 0); SocketClientCollection::iterator it; for (it = mClients->begin(); it != mClients->end(); ++it) { if (*it == c) { mClients->erase(it); ret = true; break; } } pthread_mutex_unlock(&mClientsLock); if (ret) { ret = c->decRef(); Loading @@ -244,19 +270,25 @@ bool SocketListener::release(SocketClient* c, bool wakeup) { } void SocketListener::sendBroadcast(int code, const char *msg, bool addErrno) { // Add all clients to a separate list first, so we don't have to hold // the lock while processing it. std::vector<SocketClient*> clients; SocketClientCollection safeList; /* Add all active clients to the safe list first */ safeList.clear(); pthread_mutex_lock(&mClientsLock); clients.reserve(mClients.size()); for (auto pair : mClients) { SocketClient* c = pair.second; SocketClientCollection::iterator i; for (i = mClients->begin(); i != mClients->end(); ++i) { SocketClient* c = *i; c->incRef(); clients.push_back(c); safeList.push_back(c); } pthread_mutex_unlock(&mClientsLock); for (SocketClient* c : clients) { while (!safeList.empty()) { /* Pop the first item from the list */ i = safeList.begin(); SocketClient* c = *i; safeList.erase(i); // broadcasts are unsolicited and should not include a cmd number if (c->sendMsg(code, msg, addErrno, false)) { SLOGW("Error sending broadcast (%s)", strerror(errno)); Loading @@ -266,19 +298,25 @@ void SocketListener::sendBroadcast(int code, const char *msg, bool addErrno) { } void SocketListener::runOnEachSocket(SocketClientCommand *command) { // Add all clients to a separate list first, so we don't have to hold // the lock while processing it. std::vector<SocketClient*> clients; SocketClientCollection safeList; /* Add all active clients to the safe list first */ safeList.clear(); pthread_mutex_lock(&mClientsLock); clients.reserve(mClients.size()); for (auto pair : mClients) { SocketClient* c = pair.second; SocketClientCollection::iterator i; for (i = mClients->begin(); i != mClients->end(); ++i) { SocketClient* c = *i; c->incRef(); clients.push_back(c); safeList.push_back(c); } pthread_mutex_unlock(&mClientsLock); for (SocketClient* c : clients) { while (!safeList.empty()) { /* Pop the first item from the list */ i = safeList.begin(); SocketClient* c = *i; safeList.erase(i); command->runSocketCommand(c); c->decRef(); } Loading Loading
libsysutils/include/sysutils/SocketClient.h +4 −1 Original line number Diff line number Diff line #ifndef _SOCKET_CLIENT_H #define _SOCKET_CLIENT_H #include "List.h" #include <pthread.h> #include <cutils/atomic.h> #include <sys/types.h> Loading Loading @@ -33,7 +35,7 @@ public: SocketClient(int sock, bool owned, bool useCmdNum); virtual ~SocketClient(); int getSocket() const { return mSocket; } int getSocket() { return mSocket; } pid_t getPid() const { return mPid; } uid_t getUid() const { return mUid; } gid_t getGid() const { return mGid; } Loading Loading @@ -82,4 +84,5 @@ private: int sendDataLockedv(struct iovec *iov, int iovcnt); }; typedef android::sysutils::List<SocketClient *> SocketClientCollection; #endif
libsysutils/include/sysutils/SocketListener.h +2 −4 Original line number Diff line number Diff line /* * Copyright (C) 2008 The Android Open Source Project * Copyright (C) 2008-2014 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. Loading @@ -18,8 +18,6 @@ #include <pthread.h> #include <unordered_map> #include <sysutils/SocketClient.h> #include "SocketClientCommand.h" Loading @@ -27,7 +25,7 @@ class SocketListener { bool mListen; const char *mSocketName; int mSock; std::unordered_map<int, SocketClient*> mClients; SocketClientCollection *mClients; pthread_mutex_t mClientsLock; int mCtrlPipe[2]; pthread_t mThread; Loading
libsysutils/src/SocketListener.cpp +101 −63 Original line number Diff line number Diff line /* * Copyright (C) 2008 The Android Open Source Project * Copyright (C) 2008-2014 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. Loading @@ -19,15 +19,13 @@ #include <errno.h> #include <stdio.h> #include <stdlib.h> #include <sys/poll.h> #include <sys/select.h> #include <sys/socket.h> #include <sys/time.h> #include <sys/types.h> #include <sys/un.h> #include <unistd.h> #include <vector> #include <cutils/sockets.h> #include <log/log.h> #include <sysutils/SocketListener.h> Loading @@ -54,6 +52,7 @@ void SocketListener::init(const char *socketName, int socketFd, bool listen, boo mSock = socketFd; mUseCmdNum = useCmdNum; pthread_mutex_init(&mClientsLock, NULL); mClients = new SocketClientCollection(); } SocketListener::~SocketListener() { Loading @@ -64,9 +63,12 @@ SocketListener::~SocketListener() { close(mCtrlPipe[0]); close(mCtrlPipe[1]); } for (auto pair : mClients) { pair.second->decRef(); SocketClientCollection::iterator it; for (it = mClients->begin(); it != mClients->end();) { (*it)->decRef(); it = mClients->erase(it); } delete mClients; } int SocketListener::startListener() { Loading @@ -93,7 +95,7 @@ int SocketListener::startListener(int backlog) { SLOGE("Unable to listen on socket (%s)", strerror(errno)); return -1; } else if (!mListen) mClients[mSock] = new SocketClient(mSock, false, mUseCmdNum); mClients->push_back(new SocketClient(mSock, false, mUseCmdNum)); if (pipe(mCtrlPipe)) { SLOGE("pipe failed (%s)", strerror(errno)); Loading Loading @@ -133,10 +135,11 @@ int SocketListener::stopListener() { mSock = -1; } for (auto pair : mClients) { delete pair.second; SocketClientCollection::iterator it; for (it = mClients->begin(); it != mClients->end();) { delete (*it); it = mClients->erase(it); } mClients.clear(); return 0; } Loading @@ -149,30 +152,47 @@ void *SocketListener::threadStart(void *obj) { } void SocketListener::runListener() { while (true) { std::vector<pollfd> fds; SocketClientCollection pendingList; while(1) { SocketClientCollection::iterator it; fd_set read_fds; int rc = 0; int max = -1; FD_ZERO(&read_fds); if (mListen) { max = mSock; FD_SET(mSock, &read_fds); } FD_SET(mCtrlPipe[0], &read_fds); if (mCtrlPipe[0] > max) max = mCtrlPipe[0]; pthread_mutex_lock(&mClientsLock); fds.reserve(2 + mClients.size()); fds.push_back({.fd = mCtrlPipe[0], .events = POLLIN}); if (mListen) fds.push_back({.fd = mSock, .events = POLLIN}); for (auto pair : mClients) { for (it = mClients->begin(); it != mClients->end(); ++it) { // NB: calling out to an other object with mClientsLock held (safe) const int fd = pair.second->getSocket(); if (fd != pair.first) SLOGE("fd mismatch: %d != %d", fd, pair.first); fds.push_back({.fd = fd, .events = POLLIN}); int fd = (*it)->getSocket(); FD_SET(fd, &read_fds); if (fd > max) { max = fd; } } pthread_mutex_unlock(&mClientsLock); SLOGV("mListen=%d, mSocketName=%s", mListen, mSocketName); int rc = TEMP_FAILURE_RETRY(poll(fds.data(), fds.size(), -1)); if (rc < 0) { SLOGE("poll failed (%s) mListen=%d", strerror(errno), mListen); SLOGV("mListen=%d, max=%d, mSocketName=%s", mListen, max, mSocketName); if ((rc = select(max + 1, &read_fds, NULL, NULL, NULL)) < 0) { if (errno == EINTR) continue; SLOGE("select failed (%s) mListen=%d, max=%d", strerror(errno), mListen, max); sleep(1); continue; } } else if (!rc) continue; if (fds[0].revents & (POLLIN | POLLERR)) { if (FD_ISSET(mCtrlPipe[0], &read_fds)) { char c = CtrlPipe_Shutdown; TEMP_FAILURE_RETRY(read(mCtrlPipe[0], &c, 1)); if (c == CtrlPipe_Shutdown) { Loading @@ -180,7 +200,7 @@ void SocketListener::runListener() { } continue; } if (mListen && (fds[1].revents & (POLLIN | POLLERR))) { if (mListen && FD_ISSET(mSock, &read_fds)) { int c = TEMP_FAILURE_RETRY(accept4(mSock, nullptr, nullptr, SOCK_CLOEXEC)); if (c < 0) { SLOGE("accept failed (%s)", strerror(errno)); Loading @@ -188,33 +208,32 @@ void SocketListener::runListener() { continue; } pthread_mutex_lock(&mClientsLock); mClients[c] = new SocketClient(c, true, mUseCmdNum); mClients->push_back(new SocketClient(c, true, mUseCmdNum)); pthread_mutex_unlock(&mClientsLock); } // Add all active clients to the pending list first, so we can release // the lock before invoking the callbacks. std::vector<SocketClient*> pending; /* Add all active clients to the pending list first */ pendingList.clear(); pthread_mutex_lock(&mClientsLock); const int size = fds.size(); for (int i = mListen ? 2 : 1; i < size; ++i) { const struct pollfd& p = fds[i]; if (p.events & (POLLIN | POLLERR)) { auto it = mClients.find(p.fd); if (it == mClients.end()) { SLOGE("fd vanished: %d", p.fd); continue; } SocketClient* c = it->second; pending.push_back(c); for (it = mClients->begin(); it != mClients->end(); ++it) { SocketClient* c = *it; // NB: calling out to an other object with mClientsLock held (safe) int fd = c->getSocket(); if (FD_ISSET(fd, &read_fds)) { pendingList.push_back(c); c->incRef(); } } pthread_mutex_unlock(&mClientsLock); for (SocketClient* c : pending) { // Process it, if false is returned, remove from the map SLOGV("processing fd %d", c->getSocket()); /* Process the pending list, since it is owned by the thread, * there is no need to lock it */ while (!pendingList.empty()) { /* Pop the first item from the list */ it = pendingList.begin(); SocketClient* c = *it; pendingList.erase(it); /* Process it, if false is returned, remove from list */ if (!onDataAvailable(c)) { release(c, false); } Loading @@ -227,10 +246,17 @@ bool SocketListener::release(SocketClient* c, bool wakeup) { bool ret = false; /* if our sockets are connection-based, remove and destroy it */ if (mListen && c) { /* Remove the client from our map */ /* Remove the client from our array */ SLOGV("going to zap %d for %s", c->getSocket(), mSocketName); pthread_mutex_lock(&mClientsLock); ret = (mClients.erase(c->getSocket()) != 0); SocketClientCollection::iterator it; for (it = mClients->begin(); it != mClients->end(); ++it) { if (*it == c) { mClients->erase(it); ret = true; break; } } pthread_mutex_unlock(&mClientsLock); if (ret) { ret = c->decRef(); Loading @@ -244,19 +270,25 @@ bool SocketListener::release(SocketClient* c, bool wakeup) { } void SocketListener::sendBroadcast(int code, const char *msg, bool addErrno) { // Add all clients to a separate list first, so we don't have to hold // the lock while processing it. std::vector<SocketClient*> clients; SocketClientCollection safeList; /* Add all active clients to the safe list first */ safeList.clear(); pthread_mutex_lock(&mClientsLock); clients.reserve(mClients.size()); for (auto pair : mClients) { SocketClient* c = pair.second; SocketClientCollection::iterator i; for (i = mClients->begin(); i != mClients->end(); ++i) { SocketClient* c = *i; c->incRef(); clients.push_back(c); safeList.push_back(c); } pthread_mutex_unlock(&mClientsLock); for (SocketClient* c : clients) { while (!safeList.empty()) { /* Pop the first item from the list */ i = safeList.begin(); SocketClient* c = *i; safeList.erase(i); // broadcasts are unsolicited and should not include a cmd number if (c->sendMsg(code, msg, addErrno, false)) { SLOGW("Error sending broadcast (%s)", strerror(errno)); Loading @@ -266,19 +298,25 @@ void SocketListener::sendBroadcast(int code, const char *msg, bool addErrno) { } void SocketListener::runOnEachSocket(SocketClientCommand *command) { // Add all clients to a separate list first, so we don't have to hold // the lock while processing it. std::vector<SocketClient*> clients; SocketClientCollection safeList; /* Add all active clients to the safe list first */ safeList.clear(); pthread_mutex_lock(&mClientsLock); clients.reserve(mClients.size()); for (auto pair : mClients) { SocketClient* c = pair.second; SocketClientCollection::iterator i; for (i = mClients->begin(); i != mClients->end(); ++i) { SocketClient* c = *i; c->incRef(); clients.push_back(c); safeList.push_back(c); } pthread_mutex_unlock(&mClientsLock); for (SocketClient* c : clients) { while (!safeList.empty()) { /* Pop the first item from the list */ i = safeList.begin(); SocketClient* c = *i; safeList.erase(i); command->runSocketCommand(c); c->decRef(); } Loading