queueでpushとpopの連続実行時にエラー

フォーラム(掲示板)ルール
フォーラム(掲示板)ルールはこちら  ※コードを貼り付ける場合は [code][/code] で囲って下さい。詳しくはこちら
tosh

queueでpushとpopの連続実行時にエラー

#1

投稿記事 by tosh » 5年前

お世話になります。

キューを持つスレッドを作成し、メイン処理側で作成したデータを
スレッドのキューに追加して、スレッド側のキューから順次CSVファイルに
出力する処理を作成しております。

CSVファイルへの出力自体はできたのですが、下記main関数内のfor文の繰り返し件数を増やすと
途中で「deque iterator not dereferencable」という例外が発生し、それ以上出力できなくなります。

調べてみると、キューが空の場合にpopを行うと発生するらしいのですが、
事前にempty() == false の場合にしか処理をしていないのに、なぜ起きるのか分かりません。
(frontやpopの部分にtry catchを入れてみましたが、例外を検出できませんでした。)

以下に要約したコードを記しますので、誤り等ありましたらご指摘頂ければ幸いです。
よろしくお願いします。

コード:

//////////////////////////////////////////////////////////////////////////////
// メイン側
//////////////////////////////////////////////////////////////////////////////
Thread *CsvOutThread;
int main()
{

	// スレッドの生成と起動
	makeThread();

	for (int i = 0; i < 20000; i++)
	{
		// データ生成
		RECORD_T *pRecord;
		memset(pRecord, 0, sizeof(RECORD_T));
		
		// ここでpRecord編集
		
		// ここにデータ格納処理を入れる
		CsvOutThread->EnQueData(pRecord);

	}

    return 0;
}

//////////////////////////////////////////////////////////////////////////////
// スレッド側
//////////////////////////////////////////////////////////////////////////////
queue<RECORD_T *> DataQ;
int Thread::thmain()
{

	// スレッドメインループ
	while (m_blLoopFlg)
	{
		OutputData();
	}

	return 0;
}

//////////////////////////////////////////////////////////////////////////////
// データのキュー追加処理
//////////////////////////////////////////////////////////////////////////////
void Thread::EnQueData(RECORD_T *pRecord)
{
	// データをキューに追加する
	DataQ.push(pRecord);
}
//////////////////////////////////////////////////////////////////////////////
// データ出力
//////////////////////////////////////////////////////////////////////////////
void Thread::OutputData()
{

	if (DataQ.empty() == false)
	{
		RECORD_T* Record;

		// データをキューから出す
		Record = DataQ.front();

		// 編集してCSV出力する

		// 出力したデータをキューから削除
		DataQ.pop();
	}
}

tosh

Re: queueでpushとpopの連続実行時にエラー

#2

投稿記事 by tosh » 5年前

情報追加です。

例外が発生している個所は、データをキューから出すfront処理の部分で、
例外発生時にキューの中身を確認したところ、キューは空ではありませんでしたが
先頭のレコードだけがNULLになっていました。
pop処理をした後、少しの間先頭のレコードがNULLになる、という現象が起こり得るのでしょうか?

maru
記事: 150
登録日時: 13年前

Re: queueでpushとpopの連続実行時にエラー

#3

投稿記事 by maru » 5年前

質問です。
以下のコードでは RECORD_T の実体が確保されていないんですが、そこはいいんですか?
pRecord が不定ですけど。

コード:

		RECORD_T *pRecord;
		memset(pRecord, 0, sizeof(RECORD_T));

tosh

Re: queueでpushとpopの連続実行時にエラー

#4

投稿記事 by tosh » 5年前

maru様、ご指摘ありがとうございます。
要約する際に省いてしまいましたが、
pRecord = new RECORD_T;
が抜けていました。

かずま

Re: queueでpushとpopの連続実行時にエラー

#5

投稿記事 by かずま » 5年前

queue は、スレッドセーフではないので、
push、pop には排他制御が必要です。

スレッドは何を使っていますか?
Linux の pthread?
Windows の CreateThread や beginthread?

C++11 の <thread> を使って書いてみました。

コード:

#include <fstream>
#include <queue>
#include <thread>
#include <mutex>
using namespace std;

struct RECORD_T { int data1, data2, data3; };

class Thread {
	bool m_blLoopFlg;
	thread *m_th;
	mutex m_mtx;
	ofstream m_ofs;
public:
	void EnQueData(RECORD_T *pRecord);
	void OutputData();
	int thmain();
	void Init();
	void Wait();
	static void Start();
};

Thread *CsvOutThread;

void makeThread()
{
	static Thread th;
	CsvOutThread = &th;
	th.Init();
}

int main()
{
	makeThread();
	for (int i = 0; i < 20000; i++) {
		RECORD_T *pRecord = new RECORD_T;
		//memset(pRecord, 0, sizeof(RECORD_T));
		pRecord->data1 = i + 1;
		pRecord->data2 = -i;
		pRecord->data3 = i * 2;
		CsvOutThread->EnQueData(pRecord);
	}
	CsvOutThread->Wait();
    return 0;
}

queue<RECORD_T *> DataQ;

int Thread::thmain()
{
	while (m_blLoopFlg) {
		OutputData();
	}
	return 0;
}

void Thread::EnQueData(RECORD_T *pRecord)
{
	lock_guard<mutex> lock(m_mtx);  // 排他制御のために lock が必要
	DataQ.push(pRecord);
}

void Thread::OutputData()
{
	lock_guard<mutex> lock(m_mtx);  // 排他制御のために lock が必要
	if (!DataQ.empty()) {
		RECORD_T *p = DataQ.front();
		m_ofs << p->data1 << ',' << p->data2 << ',' << p->data3 << '\n';
		delete p;
		DataQ.pop();
	}
}

void Thread::Start()
{
	CsvOutThread->thmain();
}

void Thread::Init()
{
	m_ofs.open("outdata.csv");
	m_blLoopFlg = true;
	m_th = new thread(Thread::Start);
}

void Thread::Wait()
{
	this_thread::sleep_for(chrono::seconds(1));
	m_blLoopFlg = false;
	m_th->join();
}

dic
記事: 657
登録日時: 13年前
住所: 宮崎県
連絡を取る:

Re: queueでpushとpopの連続実行時にエラー

#6

投稿記事 by dic » 5年前

マルチスレッド環境では、同期処理について
勉強してください。
上の文章だけだと、コンパイルの設定も必要になってきます。
OS 言語は何を使っているのか教えてください。

tosh

Re: queueでpushとpopの連続実行時にエラー

#7

投稿記事 by tosh » 5年前

dic様、かずま様

ご指摘ありがとうございました。
FIFOのキューイングの場合、同時アクセス等は気にしなくても
良いと思っていたのですが、排他制御が必要だったのですね・・・

かずま様のコードに倣い、EnQueDataとOutputDataにそれぞれ
lock_guardを入れて動作させたところ、2万件の入出力を行っても
例外は発生せず、問題は解決ました。

皆様、ご協力ありがとうございました。

かずま

Re: queueでpushとpopの連続実行時にエラー

#8

投稿記事 by かずま » 5年前

Wait の中で、適当に 1秒待つようにしましたが、これは長すぎて、
ファイルへの出力がもう終わっているのに、OutputData() が
何度も何度も繰り返し呼ばれるので、次のように修正します。

コード:

#include <fstream>
#include <queue>
#include <thread>
#include <mutex>
using namespace std;

struct RECORD_T { int data1, data2, data3; };

class Thread {
	bool m_blLoopFlg;
	thread *m_th;
	mutex m_mtx;
	ofstream m_ofs;
public:
	void EnQueData(RECORD_T *pRecord);
	bool OutputData();
	int  thmain();
	void Init();
	void Wait();
	static void Start();
};

Thread *CsvOutThread;

void makeThread()
{
	static Thread th;
	CsvOutThread = &th;
	th.Init();
}

int main()
{
	makeThread();

	for (int i = 0; i < 20000; i++) {
		RECORD_T *pRecord = new RECORD_T;
		//memset(pRecord, 0, sizeof(RECORD_T));
		pRecord->data1 = i + 1;
		pRecord->data2 = -i;
		pRecord->data3 = i * 2;
		CsvOutThread->EnQueData(pRecord);
	}
	CsvOutThread->Wait();
    return 0;
}

queue<RECORD_T *> DataQ;

int Thread::thmain()
{
	while (m_blLoopFlg)
		if (OutputData())
			this_thread::sleep_for(chrono::milliseconds(10));
	return 0;
}

void Thread::EnQueData(RECORD_T *pRecord)
{
	lock_guard<mutex> lock(m_mtx);  // 排他制御のために lock が必要
	DataQ.push(pRecord);
}

bool Thread::OutputData()
{
	lock_guard<mutex> lock(m_mtx);  // 排他制御のために lock が必要
	if (DataQ.empty()) return true; // return empty
	RECORD_T *p = DataQ.front();
	m_ofs << p->data1 << ',' << p->data2 << ',' << DataQ.size()<< '\n';
	delete p;
	DataQ.pop();
	return false;  // return not empty
}

void Thread::Start()
{
	CsvOutThread->thmain();
}

void Thread::Init()
{
	m_ofs.open("outdata.csv");
	m_blLoopFlg = true;
	m_th = new thread(Thread::Start);
}

void Thread::Wait()
{
	while (1) {
		m_mtx.lock();
		bool e = DataQ.empty();
		m_mtx.unlock();
		if (e) break;
		this_thread::sleep_for(chrono::milliseconds(10));
	}
	m_blLoopFlg = false;
	m_th->join();
}
回答者が「スレッドは何を使っていますか?」と尋ねたり、
「OS 言語は何を使っているのか教えてください。」とお願いしているのに、
それらは無視されています。

また、問題が解決したのなら、その解決したコードを示してください。

返信

“C言語何でも質問掲示板” へ戻る