Loading libsysutils/include/sysutils/SocketClient.h +1 −4 Original line number Diff line number Diff line #ifndef _SOCKET_CLIENT_H #define _SOCKET_CLIENT_H #include "List.h" #include <pthread.h> #include <cutils/atomic.h> #include <sys/types.h> Loading Loading @@ -35,7 +33,7 @@ public: SocketClient(int sock, bool owned, bool useCmdNum); virtual ~SocketClient(); int getSocket() { return mSocket; } int getSocket() const { return mSocket; } pid_t getPid() const { return mPid; } uid_t getUid() const { return mUid; } gid_t getGid() const { return mGid; } Loading Loading @@ -84,5 +82,4 @@ private: int sendDataLockedv(struct iovec *iov, int iovcnt); }; typedef android::sysutils::List<SocketClient *> SocketClientCollection; #endif libsysutils/include/sysutils/SocketListener.h +4 −2 Original line number Diff line number Diff line /* * Copyright (C) 2008-2014 The Android Open Source Project * Copyright (C) 2008 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. Loading @@ -18,6 +18,8 @@ #include <pthread.h> #include <unordered_map> #include <sysutils/SocketClient.h> #include "SocketClientCommand.h" Loading @@ -25,7 +27,7 @@ class SocketListener { bool mListen; const char *mSocketName; int mSock; SocketClientCollection *mClients; std::unordered_map<int, SocketClient*> mClients; pthread_mutex_t mClientsLock; int mCtrlPipe[2]; pthread_t mThread; Loading libsysutils/src/SocketListener.cpp +63 −101 Original line number Diff line number Diff line /* * Copyright (C) 2008-2014 The Android Open Source Project * Copyright (C) 2008 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. Loading @@ -19,13 +19,15 @@ #include <errno.h> #include <stdio.h> #include <stdlib.h> #include <sys/select.h> #include <sys/poll.h> #include <sys/socket.h> #include <sys/time.h> #include <sys/types.h> #include <sys/un.h> #include <unistd.h> #include <vector> #include <cutils/sockets.h> #include <log/log.h> #include <sysutils/SocketListener.h> Loading @@ -52,7 +54,6 @@ void SocketListener::init(const char *socketName, int socketFd, bool listen, boo mSock = socketFd; mUseCmdNum = useCmdNum; pthread_mutex_init(&mClientsLock, NULL); mClients = new SocketClientCollection(); } SocketListener::~SocketListener() { Loading @@ -63,12 +64,9 @@ SocketListener::~SocketListener() { close(mCtrlPipe[0]); close(mCtrlPipe[1]); } SocketClientCollection::iterator it; for (it = mClients->begin(); it != mClients->end();) { (*it)->decRef(); it = mClients->erase(it); for (auto pair : mClients) { pair.second->decRef(); } delete mClients; } int SocketListener::startListener() { Loading @@ -95,7 +93,7 @@ int SocketListener::startListener(int backlog) { SLOGE("Unable to listen on socket (%s)", strerror(errno)); return -1; } else if (!mListen) mClients->push_back(new SocketClient(mSock, false, mUseCmdNum)); mClients[mSock] = new SocketClient(mSock, false, mUseCmdNum); if (pipe(mCtrlPipe)) { SLOGE("pipe failed (%s)", strerror(errno)); Loading Loading @@ -135,11 +133,10 @@ int SocketListener::stopListener() { mSock = -1; } SocketClientCollection::iterator it; for (it = mClients->begin(); it != mClients->end();) { delete (*it); it = mClients->erase(it); for (auto pair : mClients) { delete pair.second; } mClients.clear(); return 0; } Loading @@ -152,47 +149,30 @@ void *SocketListener::threadStart(void *obj) { } void SocketListener::runListener() { SocketClientCollection pendingList; while(1) { SocketClientCollection::iterator it; fd_set read_fds; int rc = 0; int max = -1; FD_ZERO(&read_fds); if (mListen) { max = mSock; FD_SET(mSock, &read_fds); } FD_SET(mCtrlPipe[0], &read_fds); if (mCtrlPipe[0] > max) max = mCtrlPipe[0]; while (true) { std::vector<pollfd> fds; pthread_mutex_lock(&mClientsLock); for (it = mClients->begin(); it != mClients->end(); ++it) { fds.reserve(2 + mClients.size()); fds.push_back({.fd = mCtrlPipe[0], .events = POLLIN}); if (mListen) fds.push_back({.fd = mSock, .events = POLLIN}); for (auto pair : mClients) { // NB: calling out to an other object with mClientsLock held (safe) int fd = (*it)->getSocket(); FD_SET(fd, &read_fds); if (fd > max) { max = fd; } const int fd = pair.second->getSocket(); if (fd != pair.first) SLOGE("fd mismatch: %d != %d", fd, pair.first); fds.push_back({.fd = fd, .events = POLLIN}); } pthread_mutex_unlock(&mClientsLock); SLOGV("mListen=%d, max=%d, mSocketName=%s", mListen, max, mSocketName); if ((rc = select(max + 1, &read_fds, NULL, NULL, NULL)) < 0) { if (errno == EINTR) continue; SLOGE("select failed (%s) mListen=%d, max=%d", strerror(errno), mListen, max); SLOGV("mListen=%d, mSocketName=%s", mListen, mSocketName); int rc = TEMP_FAILURE_RETRY(poll(fds.data(), fds.size(), -1)); if (rc < 0) { SLOGE("poll failed (%s) mListen=%d", strerror(errno), mListen); sleep(1); continue; } else if (!rc) continue; } if (FD_ISSET(mCtrlPipe[0], &read_fds)) { if (fds[0].revents & (POLLIN | POLLERR)) { char c = CtrlPipe_Shutdown; TEMP_FAILURE_RETRY(read(mCtrlPipe[0], &c, 1)); if (c == CtrlPipe_Shutdown) { Loading @@ -200,7 +180,7 @@ void SocketListener::runListener() { } continue; } if (mListen && FD_ISSET(mSock, &read_fds)) { if (mListen && (fds[1].revents & (POLLIN | POLLERR))) { int c = TEMP_FAILURE_RETRY(accept4(mSock, nullptr, nullptr, SOCK_CLOEXEC)); if (c < 0) { SLOGE("accept failed (%s)", strerror(errno)); Loading @@ -208,32 +188,33 @@ void SocketListener::runListener() { continue; } pthread_mutex_lock(&mClientsLock); mClients->push_back(new SocketClient(c, true, mUseCmdNum)); mClients[c] = new SocketClient(c, true, mUseCmdNum); pthread_mutex_unlock(&mClientsLock); } /* Add all active clients to the pending list first */ pendingList.clear(); // Add all active clients to the pending list first, so we can release // the lock before invoking the callbacks. std::vector<SocketClient*> pending; pthread_mutex_lock(&mClientsLock); for (it = mClients->begin(); it != mClients->end(); ++it) { SocketClient* c = *it; // NB: calling out to an other object with mClientsLock held (safe) int fd = c->getSocket(); if (FD_ISSET(fd, &read_fds)) { pendingList.push_back(c); const int size = fds.size(); for (int i = mListen ? 2 : 1; i < size; ++i) { const struct pollfd& p = fds[i]; if (p.events & (POLLIN | POLLERR)) { auto it = mClients.find(p.fd); if (it == mClients.end()) { SLOGE("fd vanished: %d", p.fd); continue; } SocketClient* c = it->second; pending.push_back(c); c->incRef(); } } pthread_mutex_unlock(&mClientsLock); /* Process the pending list, since it is owned by the thread, * there is no need to lock it */ while (!pendingList.empty()) { /* Pop the first item from the list */ it = pendingList.begin(); SocketClient* c = *it; pendingList.erase(it); /* Process it, if false is returned, remove from list */ for (SocketClient* c : pending) { // Process it, if false is returned, remove from the map SLOGV("processing fd %d", c->getSocket()); if (!onDataAvailable(c)) { release(c, false); } Loading @@ -246,17 +227,10 @@ bool SocketListener::release(SocketClient* c, bool wakeup) { bool ret = false; /* if our sockets are connection-based, remove and destroy it */ if (mListen && c) { /* Remove the client from our array */ /* Remove the client from our map */ SLOGV("going to zap %d for %s", c->getSocket(), mSocketName); pthread_mutex_lock(&mClientsLock); SocketClientCollection::iterator it; for (it = mClients->begin(); it != mClients->end(); ++it) { if (*it == c) { mClients->erase(it); ret = true; break; } } ret = (mClients.erase(c->getSocket()) != 0); pthread_mutex_unlock(&mClientsLock); if (ret) { ret = c->decRef(); Loading @@ -270,25 +244,19 @@ bool SocketListener::release(SocketClient* c, bool wakeup) { } void SocketListener::sendBroadcast(int code, const char *msg, bool addErrno) { SocketClientCollection safeList; /* Add all active clients to the safe list first */ safeList.clear(); // Add all clients to a separate list first, so we don't have to hold // the lock while processing it. std::vector<SocketClient*> clients; pthread_mutex_lock(&mClientsLock); SocketClientCollection::iterator i; for (i = mClients->begin(); i != mClients->end(); ++i) { SocketClient* c = *i; clients.reserve(mClients.size()); for (auto pair : mClients) { SocketClient* c = pair.second; c->incRef(); safeList.push_back(c); clients.push_back(c); } pthread_mutex_unlock(&mClientsLock); while (!safeList.empty()) { /* Pop the first item from the list */ i = safeList.begin(); SocketClient* c = *i; safeList.erase(i); for (SocketClient* c : clients) { // broadcasts are unsolicited and should not include a cmd number if (c->sendMsg(code, msg, addErrno, false)) { SLOGW("Error sending broadcast (%s)", strerror(errno)); Loading @@ -298,25 +266,19 @@ void SocketListener::sendBroadcast(int code, const char *msg, bool addErrno) { } void SocketListener::runOnEachSocket(SocketClientCommand *command) { SocketClientCollection safeList; /* Add all active clients to the safe list first */ safeList.clear(); // Add all clients to a separate list first, so we don't have to hold // the lock while processing it. std::vector<SocketClient*> clients; pthread_mutex_lock(&mClientsLock); SocketClientCollection::iterator i; for (i = mClients->begin(); i != mClients->end(); ++i) { SocketClient* c = *i; clients.reserve(mClients.size()); for (auto pair : mClients) { SocketClient* c = pair.second; c->incRef(); safeList.push_back(c); clients.push_back(c); } pthread_mutex_unlock(&mClientsLock); while (!safeList.empty()) { /* Pop the first item from the list */ i = safeList.begin(); SocketClient* c = *i; safeList.erase(i); for (SocketClient* c : clients) { command->runSocketCommand(c); c->decRef(); } Loading Loading
libsysutils/include/sysutils/SocketClient.h +1 −4 Original line number Diff line number Diff line #ifndef _SOCKET_CLIENT_H #define _SOCKET_CLIENT_H #include "List.h" #include <pthread.h> #include <cutils/atomic.h> #include <sys/types.h> Loading Loading @@ -35,7 +33,7 @@ public: SocketClient(int sock, bool owned, bool useCmdNum); virtual ~SocketClient(); int getSocket() { return mSocket; } int getSocket() const { return mSocket; } pid_t getPid() const { return mPid; } uid_t getUid() const { return mUid; } gid_t getGid() const { return mGid; } Loading Loading @@ -84,5 +82,4 @@ private: int sendDataLockedv(struct iovec *iov, int iovcnt); }; typedef android::sysutils::List<SocketClient *> SocketClientCollection; #endif
libsysutils/include/sysutils/SocketListener.h +4 −2 Original line number Diff line number Diff line /* * Copyright (C) 2008-2014 The Android Open Source Project * Copyright (C) 2008 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. Loading @@ -18,6 +18,8 @@ #include <pthread.h> #include <unordered_map> #include <sysutils/SocketClient.h> #include "SocketClientCommand.h" Loading @@ -25,7 +27,7 @@ class SocketListener { bool mListen; const char *mSocketName; int mSock; SocketClientCollection *mClients; std::unordered_map<int, SocketClient*> mClients; pthread_mutex_t mClientsLock; int mCtrlPipe[2]; pthread_t mThread; Loading
libsysutils/src/SocketListener.cpp +63 −101 Original line number Diff line number Diff line /* * Copyright (C) 2008-2014 The Android Open Source Project * Copyright (C) 2008 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. Loading @@ -19,13 +19,15 @@ #include <errno.h> #include <stdio.h> #include <stdlib.h> #include <sys/select.h> #include <sys/poll.h> #include <sys/socket.h> #include <sys/time.h> #include <sys/types.h> #include <sys/un.h> #include <unistd.h> #include <vector> #include <cutils/sockets.h> #include <log/log.h> #include <sysutils/SocketListener.h> Loading @@ -52,7 +54,6 @@ void SocketListener::init(const char *socketName, int socketFd, bool listen, boo mSock = socketFd; mUseCmdNum = useCmdNum; pthread_mutex_init(&mClientsLock, NULL); mClients = new SocketClientCollection(); } SocketListener::~SocketListener() { Loading @@ -63,12 +64,9 @@ SocketListener::~SocketListener() { close(mCtrlPipe[0]); close(mCtrlPipe[1]); } SocketClientCollection::iterator it; for (it = mClients->begin(); it != mClients->end();) { (*it)->decRef(); it = mClients->erase(it); for (auto pair : mClients) { pair.second->decRef(); } delete mClients; } int SocketListener::startListener() { Loading @@ -95,7 +93,7 @@ int SocketListener::startListener(int backlog) { SLOGE("Unable to listen on socket (%s)", strerror(errno)); return -1; } else if (!mListen) mClients->push_back(new SocketClient(mSock, false, mUseCmdNum)); mClients[mSock] = new SocketClient(mSock, false, mUseCmdNum); if (pipe(mCtrlPipe)) { SLOGE("pipe failed (%s)", strerror(errno)); Loading Loading @@ -135,11 +133,10 @@ int SocketListener::stopListener() { mSock = -1; } SocketClientCollection::iterator it; for (it = mClients->begin(); it != mClients->end();) { delete (*it); it = mClients->erase(it); for (auto pair : mClients) { delete pair.second; } mClients.clear(); return 0; } Loading @@ -152,47 +149,30 @@ void *SocketListener::threadStart(void *obj) { } void SocketListener::runListener() { SocketClientCollection pendingList; while(1) { SocketClientCollection::iterator it; fd_set read_fds; int rc = 0; int max = -1; FD_ZERO(&read_fds); if (mListen) { max = mSock; FD_SET(mSock, &read_fds); } FD_SET(mCtrlPipe[0], &read_fds); if (mCtrlPipe[0] > max) max = mCtrlPipe[0]; while (true) { std::vector<pollfd> fds; pthread_mutex_lock(&mClientsLock); for (it = mClients->begin(); it != mClients->end(); ++it) { fds.reserve(2 + mClients.size()); fds.push_back({.fd = mCtrlPipe[0], .events = POLLIN}); if (mListen) fds.push_back({.fd = mSock, .events = POLLIN}); for (auto pair : mClients) { // NB: calling out to an other object with mClientsLock held (safe) int fd = (*it)->getSocket(); FD_SET(fd, &read_fds); if (fd > max) { max = fd; } const int fd = pair.second->getSocket(); if (fd != pair.first) SLOGE("fd mismatch: %d != %d", fd, pair.first); fds.push_back({.fd = fd, .events = POLLIN}); } pthread_mutex_unlock(&mClientsLock); SLOGV("mListen=%d, max=%d, mSocketName=%s", mListen, max, mSocketName); if ((rc = select(max + 1, &read_fds, NULL, NULL, NULL)) < 0) { if (errno == EINTR) continue; SLOGE("select failed (%s) mListen=%d, max=%d", strerror(errno), mListen, max); SLOGV("mListen=%d, mSocketName=%s", mListen, mSocketName); int rc = TEMP_FAILURE_RETRY(poll(fds.data(), fds.size(), -1)); if (rc < 0) { SLOGE("poll failed (%s) mListen=%d", strerror(errno), mListen); sleep(1); continue; } else if (!rc) continue; } if (FD_ISSET(mCtrlPipe[0], &read_fds)) { if (fds[0].revents & (POLLIN | POLLERR)) { char c = CtrlPipe_Shutdown; TEMP_FAILURE_RETRY(read(mCtrlPipe[0], &c, 1)); if (c == CtrlPipe_Shutdown) { Loading @@ -200,7 +180,7 @@ void SocketListener::runListener() { } continue; } if (mListen && FD_ISSET(mSock, &read_fds)) { if (mListen && (fds[1].revents & (POLLIN | POLLERR))) { int c = TEMP_FAILURE_RETRY(accept4(mSock, nullptr, nullptr, SOCK_CLOEXEC)); if (c < 0) { SLOGE("accept failed (%s)", strerror(errno)); Loading @@ -208,32 +188,33 @@ void SocketListener::runListener() { continue; } pthread_mutex_lock(&mClientsLock); mClients->push_back(new SocketClient(c, true, mUseCmdNum)); mClients[c] = new SocketClient(c, true, mUseCmdNum); pthread_mutex_unlock(&mClientsLock); } /* Add all active clients to the pending list first */ pendingList.clear(); // Add all active clients to the pending list first, so we can release // the lock before invoking the callbacks. std::vector<SocketClient*> pending; pthread_mutex_lock(&mClientsLock); for (it = mClients->begin(); it != mClients->end(); ++it) { SocketClient* c = *it; // NB: calling out to an other object with mClientsLock held (safe) int fd = c->getSocket(); if (FD_ISSET(fd, &read_fds)) { pendingList.push_back(c); const int size = fds.size(); for (int i = mListen ? 2 : 1; i < size; ++i) { const struct pollfd& p = fds[i]; if (p.events & (POLLIN | POLLERR)) { auto it = mClients.find(p.fd); if (it == mClients.end()) { SLOGE("fd vanished: %d", p.fd); continue; } SocketClient* c = it->second; pending.push_back(c); c->incRef(); } } pthread_mutex_unlock(&mClientsLock); /* Process the pending list, since it is owned by the thread, * there is no need to lock it */ while (!pendingList.empty()) { /* Pop the first item from the list */ it = pendingList.begin(); SocketClient* c = *it; pendingList.erase(it); /* Process it, if false is returned, remove from list */ for (SocketClient* c : pending) { // Process it, if false is returned, remove from the map SLOGV("processing fd %d", c->getSocket()); if (!onDataAvailable(c)) { release(c, false); } Loading @@ -246,17 +227,10 @@ bool SocketListener::release(SocketClient* c, bool wakeup) { bool ret = false; /* if our sockets are connection-based, remove and destroy it */ if (mListen && c) { /* Remove the client from our array */ /* Remove the client from our map */ SLOGV("going to zap %d for %s", c->getSocket(), mSocketName); pthread_mutex_lock(&mClientsLock); SocketClientCollection::iterator it; for (it = mClients->begin(); it != mClients->end(); ++it) { if (*it == c) { mClients->erase(it); ret = true; break; } } ret = (mClients.erase(c->getSocket()) != 0); pthread_mutex_unlock(&mClientsLock); if (ret) { ret = c->decRef(); Loading @@ -270,25 +244,19 @@ bool SocketListener::release(SocketClient* c, bool wakeup) { } void SocketListener::sendBroadcast(int code, const char *msg, bool addErrno) { SocketClientCollection safeList; /* Add all active clients to the safe list first */ safeList.clear(); // Add all clients to a separate list first, so we don't have to hold // the lock while processing it. std::vector<SocketClient*> clients; pthread_mutex_lock(&mClientsLock); SocketClientCollection::iterator i; for (i = mClients->begin(); i != mClients->end(); ++i) { SocketClient* c = *i; clients.reserve(mClients.size()); for (auto pair : mClients) { SocketClient* c = pair.second; c->incRef(); safeList.push_back(c); clients.push_back(c); } pthread_mutex_unlock(&mClientsLock); while (!safeList.empty()) { /* Pop the first item from the list */ i = safeList.begin(); SocketClient* c = *i; safeList.erase(i); for (SocketClient* c : clients) { // broadcasts are unsolicited and should not include a cmd number if (c->sendMsg(code, msg, addErrno, false)) { SLOGW("Error sending broadcast (%s)", strerror(errno)); Loading @@ -298,25 +266,19 @@ void SocketListener::sendBroadcast(int code, const char *msg, bool addErrno) { } void SocketListener::runOnEachSocket(SocketClientCommand *command) { SocketClientCollection safeList; /* Add all active clients to the safe list first */ safeList.clear(); // Add all clients to a separate list first, so we don't have to hold // the lock while processing it. std::vector<SocketClient*> clients; pthread_mutex_lock(&mClientsLock); SocketClientCollection::iterator i; for (i = mClients->begin(); i != mClients->end(); ++i) { SocketClient* c = *i; clients.reserve(mClients.size()); for (auto pair : mClients) { SocketClient* c = pair.second; c->incRef(); safeList.push_back(c); clients.push_back(c); } pthread_mutex_unlock(&mClientsLock); while (!safeList.empty()) { /* Pop the first item from the list */ i = safeList.begin(); SocketClient* c = *i; safeList.erase(i); for (SocketClient* c : clients) { command->runSocketCommand(c); c->decRef(); } Loading